我要评分
获取效率
正确性
完整性
易理解

RF

There are ML classification and ML regression model APIs for the RF algorithm.

Model API Type

Function API

ML Classification API

def fit(dataset: Dataset[_]):

RandomForestClassificationModel

def fit(dataset: Dataset[_], paramMap: ParamMap): RandomForestClassificationModel

def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel]

def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel

ML Regression API

def fit(dataset: Dataset[_]):

RandomForestClassificationModel

def fit(dataset: Dataset[_], paramMap: ParamMap): GBTRegressionModel

def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel]

def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel

ML Classification API

  • Function

    This type of APIs is used to import sample data in dataset format, call the training API, and output the RF classification model.

  • Input and output
    1. Package name: package org.apache.spark.ml.classification
    2. Class name: RandomForestClassifier
    3. Method name: fit
    4. Input: training sample data (Dataset[_]). The following are mandatory fields.

      Param name

      Type(s)

      Default

      Description

      labelCol

      Double

      "label"

      Predicted label

      featuresCol

      Vector

      "features"

      Feature label

    5. Input: the model parameters of the fit API paramMap, paramMaps, firstParamPair, otherParamPairs, which are described as follows:

      Param name

      Type(s)

      Example

      Description

      paramMap

      ParamMap

      ParamMap(A.c -> b)

      Assigns the value of b to the parameter c of model A.

      Param name

      Type(s)

      Example

      Description

      paramMaps

      Array[ParamMa p]

      Array[ParamMa p](n)

      Generates n ParamMap model parameter lists.

      firstParamPair

      ParamPair

      ParamPair(A.c, b)

      Assigns the value of b to the parameter c of model A.

      otherParamPair s

      ParamPair

      ParamPair(A.e, f)

      Assign the value of f to the parameter e of model A.

    6. Algorithm parameters

      Algorithm Parameter

      def setCheckpointInterval(value: Int): RandomForestClassifier.this.type

      def setFeatureSubsetStrategy(value: String): RandomForestClassifier.this.type

      def setFeaturesCol(value: String): RandomForestClassifier

      def setImpurity(value: String): RandomForestClassifier.this.type

      def setLabelCol(value: String): RandomForestClassifier

      def setMaxBins(value: Int): RandomForestClassifier.this.t

      def setMaxDepth(value: Int): RandomForestClassifier.this.type

      def setMinInfoGain(value: Double): RandomForestClassifier.this.type

      def setMinInstancesPerNode(value: Int): RandomForestClassifier.this.type

      def setNumTrees(value: Int): RandomForestClassifier.this.type

      def setPredictionCol(value: String): RandomForestClassifier

      def setProbabilityCol(value: String): RandomForestClassifier

      def setRawPredictionCol(value: String): RandomForestClassifier

      def setSeed(value: Long): RandomForestClassifier.this.type

      def setSubsamplingRate(value: Double): RandomForestClassifier.this.type

      def setThresholds(value: Array[Double]): RandomForestClassifier

    7. Added algorithm parameters

      Parameter

      spark conf Parameter Name

      Description

      Type

      numTrainingD ataCopies

      spark.sophon.ml.rf. numTrainingDataC opies

      Number of training data copies

      Integer type. The value must be greater than or equal to 1 (default).

      broadcastVaria bles

      spark.sophon.ml.rf. broadcastVariables

      Whether to broadcast variables that have large storage space

      Boolean type. The default value is false.

      numPartsPerTr ainingDataCop y

      spark.sophon.ml.rf. numPartsPerTraini ngDataCopy

      Number of partitions of a single training data copy

      Integer type. The value must be greater than or equal to 0 (default, indicating that re-partitioning is not performed).

      binnedFeature sDataType

      spark.sophon.ml.rf. binnedFeaturesDat aType

      Storage format of features in training sample data

      String type. The supported storage formats are array (default) and fasthashmap.

      An example is provided as follows:

      import org.apache.spark.ml.param.{ParamMap, ParamPair}
      
      val rf= new RandomForestClassifier()// Define the random forest classifier.
      
      // Define the def fit(dataset: Dataset[_], paramMap: ParamMap) API parameter.
      val paramMap = ParamMap(rf.maxDepth -> maxDepth).put(rf.numTrees, numTrees)
      
      // Define the def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): API parameter.
      val paramMaps = new Array[ParamMap](2)
      for (i <- 0 to  paramMaps.size) {
      paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth)
      .put(rf.numTrees, numTrees)
      }// Assign a value to paramMaps.
      
      // Define the def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) API parameter.
      val firstParamPair= ParamPair(rf.maxDepth, maxDepth1)
      val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2)
      val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees)
      
      // Call the fit APIs.
      model = rf.fit(trainingData)
      model = rf.fit(trainingData, paramMap)
      models = rf.fit(trainingData, paramMaps)
      model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
    8. Example code for setting the new parameter:
      // spark: SparkSession
      spark.conf.set("spark.sophon.ml.rf.binnedFeaturesDataType", binnedFeaturesType)
      spark.conf.set("spark.sophon.ml.rf.numTrainingDataCopies", numCopiesInput)
      spark.conf.set("spark.sophon.ml.rf.numPartsPerTrainingDataCopy", pt)
      spark.conf.set("spark.sophon.ml.rf.broadcastVariables", bcVariables)
      val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    9. Output: RF classification model (RFClassificationModel). The field output in model prediction is as follows:

      Param name

      Type(s)

      Default

      Description

      predictionCol

      Double

      "prediction"

      Predicted label

  • Sample usage
    fit(dataset: Dataset[_]): RFClassificationModel example:
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
    
    // Load and parse the data file, converting it to a DataFrame.
    val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    
    // Index labels, adding metadata to the label column.
    // Fit on whole dataset to include all labels in index.
    val labelIndexer = new StringIndexer()
    .setInputCol("label")
    .setOutputCol("indexedLabel")
    .fit(data)
    
    // Automatically identify categorical features, and index them.
    // Set maxCategories so features with > 4 distinct values are treated as continuous.
    val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)
    .fit(data)
    
    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
    
    // Train a RandomForest model.
    val rf = new RandomForestClassifier()
    .setLabelCol("indexedLabel")
    .setFeaturesCol("indexedFeatures")
    .setNumTrees(10)
    
    // Convert indexed labels back to original labels. val labelConverter = new IndexToString()
    .setInputCol("prediction")
    .setOutputCol("predictedLabel")
    .setLabels(labelIndexer.labels)
    
    // Chain indexers and forest in a Pipeline.
    val pipeline = new Pipeline()
    .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
    
    // Train model. This also runs the indexers.
    val model = pipeline.fit(trainingData)
    
    // Make predictions.
    val predictions = model.transform(testData)
    
    // Select example rows to display.
    predictions.select("predictedLabel", "label", "features").show(5)
    
    // Select (prediction, true label) and compute test error.
    val evaluator = new MulticlassClassificationEvaluator()
    .setLabelCol("indexedLabel")
    .setPredictionCol("prediction")
    .setMetricName("accuracy")
    val accuracy = evaluator.evaluate(predictions)
    println(s"Test Error = ${(1.0 - accuracy)}")
    
    val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
    println(s"Learned classification forest model:\n ${rfModel.toDebugString}")
  • Sample result
    +--------------+-----+--------------------+
    |predictedLabel|label|            features|
    +--------------+-----+--------------------+
    |           1.0|  1.0|(47236,[270,439,5...|
    |           1.0|  1.0|(47236,[3023,6093...|
    |          -1.0| -1.0|(47236,[270,391,4...|
    |          -1.0| -1.0|(47236,[3718,3723...|
    |           1.0|  1.0|(47236,[729,760,1...|
    +--------------+-----+--------------------+
    only showing top 5 rows
    Test Error = 0.06476632743800015

ML Regression API

  • Function

    This type of APIs is used to import sample data in RDD format, call the training API, and output the RF classification model.

  • Input and output
    1. Package name: package org.apache.spark.mllib.tree
    2. Class name: RandomForestRegressor
    3. Method name: fit
    4. Input: training sample data (Dataset[_]). The following are mandatory fields.

      Param name

      Type(s)

      Default

      Description

      labelCol

      Double

      "label"

      Predicted label

      featuresCol

      Vector

      "features"

      Feature label

    5. Input: the model parameters of the fit API paramMap, paramMaps, firstParamPair, otherParamPairs, which are described as follows:

      Param name

      Type(s)

      Example

      Description

      paramMap

      ParamMap

      ParamMap(A.c -> b)

      Assigns the value of b to the parameter c of model A.

      paramMaps

      Array[ParamMa p]

      Array[ParamMa p](n)

      Generates n ParamMap model parameter lists.

      firstParamPair

      ParamPair

      ParamPair(A.c, b)

      Assigns the value of b to the parameter c of model A.

      otherParamPair s

      ParamPair

      ParamPair(A.e, f)

      Assign the value of f to the parameter e of model A.

    6. Algorithm parameters

      Algorithm Parameter

      def setCheckpointInterval(value: Int): RandomForestRegressor.this.type

      def setFeatureSubsetStrategy(value: String): RandomForestRegressor.this.type

      def setFeaturesCol(value: String): RandomForestRegressor

      def setImpurity(value: String): RandomForestRegressor.this.type

      def setLabelCol(value: String): RandomForestRegressor

      def setMaxBins(value: Int): RandomForestRegressor.this.type

      def setMaxDepth(value: Int): RandomForestRegressor.this.type

      def setMinInfoGain(value: Double): RandomForestRegressor.this.type

      def setMinInstancesPerNode(value: Int): RandomForestRegressor.this.type

      def setNumTrees(value: Int): RandomForestRegressor.this.type

      def setPredictionCol(value: String): RandomForestRegressor

      def setSeed(value: Long): RandomForestRegressor.this.type

      def setSubsamplingRate(value: Double):RandomForestRegressor.this.type

    7. Added algorithm parameters

      Parameter

      spark conf Parameter Name

      Description

      Type

      numTrainingD ataCopies

      spark.sophon.ml.rf. numTrainingDataC opies

      Number of training data copies

      Integer type. The value must be greater than or equal to 1 (default).

      broadcastVaria bles

      spark.sophon.ml.rf. broadcastVariables

      Whether to broadcast variables that have large storage space

      Boolean type. The default value is false.

      numPartsPerTr ainingDataCop y

      spark.sophon.ml.rf. numPartsPerTraini ngDataCopy

      Number of partitions of a single training data copy

      Integer type. The value must be greater than or equal to 0 (default, indicating that re-partitioning is not performed).

      binnedFeature sDataType

      spark.sophon.ml.rf. binnedFeaturesDat aType

      Storage format of features in training sample data

      String type. The value can be array (default) or fasthashmap.

      An example is provided as follows:

      import org.apache.spark.ml.param.{ParamMap, ParamPair}
      
      rf= new RandomForestRegressor() // Define a regression model.
      
      // Define the def fit(dataset: Dataset[_], paramMap: ParamMap) API parameter.
      val paramMap = ParamMap(rf.maxDepth -> maxDepth)
      .put(rf.numTrees, numTrees)
      
      // Define the def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): API parameter.
      val paramMaps = new Array[ParamMap](2)
      for (i <- 0 to  paramMaps.size) {
      paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth)
      .put(rf.numTrees, numTrees)
      }// Assign a value to paramMaps.
      
      // Define the def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) API parameter.
      val firstParamPair= ParamPair(rf.maxDepth, maxDepth1)
      val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2)
      val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees)
      
      // Call the fit APIs.
      model = rf.fit(trainingData)
      model = rf.fit(trainingData, paramMap)
      models = rf.fit(trainingData, paramMaps)
      model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
    8. Example code for setting the new parameter
      // spark: SparkSession
      spark.conf.set("spark.sophon.ml.rf.binnedFeaturesDataType", binnedFeaturesType)
      spark.conf.set("spark.sophon.ml.rf.numTrainingDataCopies", numCopiesInput)
      spark.conf.set("spark.sophon.ml.rf.numPartsPerTrainingDataCopy", pt)
      spark.conf.set("spark.sophon.ml.rf.broadcastVariables", bcVariables)
      val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    9. Output: GBDT regression model (RFRegressionModel or Seq[RFRegressionModel]). The following table lists the field output in model prediction.

      Param name

      Type(s)

      Default

      Description

      predictionCol

      Double

      "prediction"

      Predicted label

  • Sample usage
    fit(dataset: Dataset[_]): RFClassificationModel example:
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.evaluation.RegressionEvaluator
    import org.apache.spark.ml.feature.VectorIndexer
    import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
    
    // Load and parse the data file, converting it to a DataFrame.
    val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    
    // Automatically identify categorical features, and index them.
    // Set maxCategories so features with > 4 distinct values are treated as continuous.
    val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)
    .fit(data)
    
    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
    
    // Train a RandomForest model.
    val rf = new RandomForestRegressor()
    .setLabelCol("label")
    .setFeaturesCol("indexedFeatures")
    
    // Chain indexer and forest in a Pipeline.
    val pipeline = new Pipeline()
    .setStages(Array(featureIndexer, rf))
    
    // Train model. This also runs the indexer.
    val model = pipeline.fit(trainingData)
    
    // Make predictions.
    val predictions = model.transform(testData)
    
    // Select example rows to display.
    predictions.select("prediction", "label", "features").show(5)
    
    // Select (prediction, true label) and compute test error.
    val evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse")
    val rmse = evaluator.evaluate(predictions)
    println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
    
    val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
    println(s"Learned regression forest model:\n ${rfModel.toDebugString}")
  • Sample result
    +----------+-----+--------------------+
    |prediction|label|            features|
    +----------+-----+--------------------+
    |      0.51|  0.3|(1000,[0,1,2,3,4,...|
    +----------+-----+--------------------+
    
    root mean squared error = 0.21000000000000002

API applicability:

  • The algorithm APIs in this section apply to the Kunpeng-empowered HDP platform. For the Java development environment, JDK 1.8 or later is required. For the Spark development environment, Spark 2.3.2 is required. HDP 3.1.0 or later is required.
  • Components that need to be deployed for this algorithm to run on the HDP platform include HDFS, Spark2, Yarn, ZooKeeper, Hive and MapReduce2.