RF
There are ML classification and ML regression model APIs for the RF algorithm.
Model API Type |
Function API |
|---|---|
ML Classification API |
def fit(dataset: Dataset[_]): RandomForestClassificationModel |
def fit(dataset: Dataset[_], paramMap: ParamMap): RandomForestClassificationModel |
|
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel] |
|
def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel |
|
ML Regression API |
def fit(dataset: Dataset[_]): RandomForestClassificationModel |
def fit(dataset: Dataset[_], paramMap: ParamMap): GBTRegressionModel |
|
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel] |
|
def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel |
ML Classification API
- Function
This type of APIs is used to import sample data in dataset format, call the training API, and output the RF classification model.
- Input and output
- Package name: package org.apache.spark.ml.classification
- Class name: RandomForestClassifier
- Method name: fit
- Input: training sample data (Dataset[_]). The following are mandatory fields.
Param name
Type(s)
Default
Description
labelCol
Double
"label"
Predicted label
featuresCol
Vector
"features"
Feature label
- Input: the model parameters of the fit API paramMap, paramMaps, firstParamPair, otherParamPairs, which are described as follows:
Param name
Type(s)
Example
Description
paramMap
ParamMap
ParamMap(A.c -> b)
Assigns the value of b to the parameter c of model A.
Param name
Type(s)
Example
Description
paramMaps
Array[ParamMa p]
Array[ParamMa p](n)
Generates n ParamMap model parameter lists.
firstParamPair
ParamPair
ParamPair(A.c, b)
Assigns the value of b to the parameter c of model A.
otherParamPair s
ParamPair
ParamPair(A.e, f)
Assign the value of f to the parameter e of model A.
- Algorithm parameters
Algorithm Parameter
def setCheckpointInterval(value: Int): RandomForestClassifier.this.type
def setFeatureSubsetStrategy(value: String): RandomForestClassifier.this.type
def setFeaturesCol(value: String): RandomForestClassifier
def setImpurity(value: String): RandomForestClassifier.this.type
def setLabelCol(value: String): RandomForestClassifier
def setMaxBins(value: Int): RandomForestClassifier.this.t
def setMaxDepth(value: Int): RandomForestClassifier.this.type
def setMinInfoGain(value: Double): RandomForestClassifier.this.type
def setMinInstancesPerNode(value: Int): RandomForestClassifier.this.type
def setNumTrees(value: Int): RandomForestClassifier.this.type
def setPredictionCol(value: String): RandomForestClassifier
def setProbabilityCol(value: String): RandomForestClassifier
def setRawPredictionCol(value: String): RandomForestClassifier
def setSeed(value: Long): RandomForestClassifier.this.type
def setSubsamplingRate(value: Double): RandomForestClassifier.this.type
def setThresholds(value: Array[Double]): RandomForestClassifier
- Added algorithm parameters
Parameter
spark conf Parameter Name
Description
Type
numTrainingD ataCopies
spark.sophon.ml.rf. numTrainingDataC opies
Number of training data copies
Integer type. The value must be greater than or equal to 1 (default).
broadcastVaria bles
spark.sophon.ml.rf. broadcastVariables
Whether to broadcast variables that have large storage space
Boolean type. The default value is false.
numPartsPerTr ainingDataCop y
spark.sophon.ml.rf. numPartsPerTraini ngDataCopy
Number of partitions of a single training data copy
Integer type. The value must be greater than or equal to 0 (default, indicating that re-partitioning is not performed).
binnedFeature sDataType
spark.sophon.ml.rf. binnedFeaturesDat aType
Storage format of features in training sample data
String type. The supported storage formats are array (default) and fasthashmap.
An example is provided as follows:
import org.apache.spark.ml.param.{ParamMap, ParamPair} val rf= new RandomForestClassifier()// Define the random forest classifier. // Define the def fit(dataset: Dataset[_], paramMap: ParamMap) API parameter. val paramMap = ParamMap(rf.maxDepth -> maxDepth).put(rf.numTrees, numTrees) // Define the def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): API parameter. val paramMaps = new Array[ParamMap](2) for (i <- 0 to paramMaps.size) { paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth) .put(rf.numTrees, numTrees) }// Assign a value to paramMaps. // Define the def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) API parameter. val firstParamPair= ParamPair(rf.maxDepth, maxDepth1) val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2) val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees) // Call the fit APIs. model = rf.fit(trainingData) model = rf.fit(trainingData, paramMap) models = rf.fit(trainingData, paramMaps) model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd) - Example code for setting the new parameter:
// spark: SparkSession spark.conf.set("spark.sophon.ml.rf.binnedFeaturesDataType", binnedFeaturesType) spark.conf.set("spark.sophon.ml.rf.numTrainingDataCopies", numCopiesInput) spark.conf.set("spark.sophon.ml.rf.numPartsPerTrainingDataCopy", pt) spark.conf.set("spark.sophon.ml.rf.broadcastVariables", bcVariables) val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") - Output: RF classification model (RFClassificationModel). The field output in model prediction is as follows:
Param name
Type(s)
Default
Description
predictionCol
Double
"prediction"
Predicted label
- Sample usagefit(dataset: Dataset[_]): RFClassificationModel example:
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier} import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} // Load and parse the data file, converting it to a DataFrame. val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") // Index labels, adding metadata to the label column. // Fit on whole dataset to include all labels in index. val labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol("indexedLabel") .fit(data) // Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .setMaxCategories(4) .fit(data) // Split the data into training and test sets (30% held out for testing). val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) // Train a RandomForest model. val rf = new RandomForestClassifier() .setLabelCol("indexedLabel") .setFeaturesCol("indexedFeatures") .setNumTrees(10) // Convert indexed labels back to original labels. val labelConverter = new IndexToString() .setInputCol("prediction") .setOutputCol("predictedLabel") .setLabels(labelIndexer.labels) // Chain indexers and forest in a Pipeline. val pipeline = new Pipeline() .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter)) // Train model. This also runs the indexers. val model = pipeline.fit(trainingData) // Make predictions. val predictions = model.transform(testData) // Select example rows to display. predictions.select("predictedLabel", "label", "features").show(5) // Select (prediction, true label) and compute test error. val evaluator = new MulticlassClassificationEvaluator() .setLabelCol("indexedLabel") .setPredictionCol("prediction") .setMetricName("accuracy") val accuracy = evaluator.evaluate(predictions) println(s"Test Error = ${(1.0 - accuracy)}") val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel] println(s"Learned classification forest model:\n ${rfModel.toDebugString}") - Sample result
+--------------+-----+--------------------+ |predictedLabel|label| features| +--------------+-----+--------------------+ | 1.0| 1.0|(47236,[270,439,5...| | 1.0| 1.0|(47236,[3023,6093...| | -1.0| -1.0|(47236,[270,391,4...| | -1.0| -1.0|(47236,[3718,3723...| | 1.0| 1.0|(47236,[729,760,1...| +--------------+-----+--------------------+ only showing top 5 rows Test Error = 0.06476632743800015
ML Regression API
- Function
This type of APIs is used to import sample data in RDD format, call the training API, and output the RF classification model.
- Input and output
- Package name: package org.apache.spark.mllib.tree
- Class name: RandomForestRegressor
- Method name: fit
- Input: training sample data (Dataset[_]). The following are mandatory fields.
Param name
Type(s)
Default
Description
labelCol
Double
"label"
Predicted label
featuresCol
Vector
"features"
Feature label
- Input: the model parameters of the fit API paramMap, paramMaps, firstParamPair, otherParamPairs, which are described as follows:
Param name
Type(s)
Example
Description
paramMap
ParamMap
ParamMap(A.c -> b)
Assigns the value of b to the parameter c of model A.
paramMaps
Array[ParamMa p]
Array[ParamMa p](n)
Generates n ParamMap model parameter lists.
firstParamPair
ParamPair
ParamPair(A.c, b)
Assigns the value of b to the parameter c of model A.
otherParamPair s
ParamPair
ParamPair(A.e, f)
Assign the value of f to the parameter e of model A.
- Algorithm parameters
Algorithm Parameter
def setCheckpointInterval(value: Int): RandomForestRegressor.this.type
def setFeatureSubsetStrategy(value: String): RandomForestRegressor.this.type
def setFeaturesCol(value: String): RandomForestRegressor
def setImpurity(value: String): RandomForestRegressor.this.type
def setLabelCol(value: String): RandomForestRegressor
def setMaxBins(value: Int): RandomForestRegressor.this.type
def setMaxDepth(value: Int): RandomForestRegressor.this.type
def setMinInfoGain(value: Double): RandomForestRegressor.this.type
def setMinInstancesPerNode(value: Int): RandomForestRegressor.this.type
def setNumTrees(value: Int): RandomForestRegressor.this.type
def setPredictionCol(value: String): RandomForestRegressor
def setSeed(value: Long): RandomForestRegressor.this.type
def setSubsamplingRate(value: Double):RandomForestRegressor.this.type
- Added algorithm parameters
Parameter
spark conf Parameter Name
Description
Type
numTrainingD ataCopies
spark.sophon.ml.rf. numTrainingDataC opies
Number of training data copies
Integer type. The value must be greater than or equal to 1 (default).
broadcastVaria bles
spark.sophon.ml.rf. broadcastVariables
Whether to broadcast variables that have large storage space
Boolean type. The default value is false.
numPartsPerTr ainingDataCop y
spark.sophon.ml.rf. numPartsPerTraini ngDataCopy
Number of partitions of a single training data copy
Integer type. The value must be greater than or equal to 0 (default, indicating that re-partitioning is not performed).
binnedFeature sDataType
spark.sophon.ml.rf. binnedFeaturesDat aType
Storage format of features in training sample data
String type. The value can be array (default) or fasthashmap.
An example is provided as follows:
import org.apache.spark.ml.param.{ParamMap, ParamPair} rf= new RandomForestRegressor() // Define a regression model. // Define the def fit(dataset: Dataset[_], paramMap: ParamMap) API parameter. val paramMap = ParamMap(rf.maxDepth -> maxDepth) .put(rf.numTrees, numTrees) // Define the def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): API parameter. val paramMaps = new Array[ParamMap](2) for (i <- 0 to paramMaps.size) { paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth) .put(rf.numTrees, numTrees) }// Assign a value to paramMaps. // Define the def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) API parameter. val firstParamPair= ParamPair(rf.maxDepth, maxDepth1) val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2) val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees) // Call the fit APIs. model = rf.fit(trainingData) model = rf.fit(trainingData, paramMap) models = rf.fit(trainingData, paramMaps) model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd) - Example code for setting the new parameter
// spark: SparkSession spark.conf.set("spark.sophon.ml.rf.binnedFeaturesDataType", binnedFeaturesType) spark.conf.set("spark.sophon.ml.rf.numTrainingDataCopies", numCopiesInput) spark.conf.set("spark.sophon.ml.rf.numPartsPerTrainingDataCopy", pt) spark.conf.set("spark.sophon.ml.rf.broadcastVariables", bcVariables) val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") - Output: GBDT regression model (RFRegressionModel or Seq[RFRegressionModel]). The following table lists the field output in model prediction.
Param name
Type(s)
Default
Description
predictionCol
Double
"prediction"
Predicted label
- Sample usagefit(dataset: Dataset[_]): RFClassificationModel example:
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor} // Load and parse the data file, converting it to a DataFrame. val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") // Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .setMaxCategories(4) .fit(data) // Split the data into training and test sets (30% held out for testing). val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) // Train a RandomForest model. val rf = new RandomForestRegressor() .setLabelCol("label") .setFeaturesCol("indexedFeatures") // Chain indexer and forest in a Pipeline. val pipeline = new Pipeline() .setStages(Array(featureIndexer, rf)) // Train model. This also runs the indexer. val model = pipeline.fit(trainingData) // Make predictions. val predictions = model.transform(testData) // Select example rows to display. predictions.select("prediction", "label", "features").show(5) // Select (prediction, true label) and compute test error. val evaluator = new RegressionEvaluator() .setLabelCol("label") .setPredictionCol("prediction") .setMetricName("rmse") val rmse = evaluator.evaluate(predictions) println(s"Root Mean Squared Error (RMSE) on test data = $rmse") val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel] println(s"Learned regression forest model:\n ${rfModel.toDebugString}") - Sample result
+----------+-----+--------------------+ |prediction|label| features| +----------+-----+--------------------+ | 0.51| 0.3|(1000,[0,1,2,3,4,...| +----------+-----+--------------------+ root mean squared error = 0.21000000000000002
API applicability:
- The algorithm APIs in this section apply to the Kunpeng-empowered HDP platform. For the Java development environment, JDK 1.8 or later is required. For the Spark development environment, Spark 2.3.2 is required. HDP 3.1.0 or later is required.
- Components that need to be deployed for this algorithm to run on the HDP platform include HDFS, Spark2, Yarn, ZooKeeper, Hive and MapReduce2.