RF分为ML Classification API和ML Regression API两大类模型接口。
模型接口类别 |
函数接口 |
---|---|
ML Classification API |
def fit(dataset: Dataset[_]): RandomForestClassificationModel |
def fit(dataset: Dataset[_], paramMap: ParamMap): RandomForestClassificationModel |
|
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel] |
|
def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel |
|
ML Regression API |
def fit(dataset: Dataset[_]): RandomForestClassificationModel |
def fit(dataset: Dataset[_], paramMap: ParamMap): GBTRegressionModel |
|
def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[RandomForestClassificationModel] |
|
def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): RandomForestClassificationModel |
Param name |
Type(s) |
Default |
Description |
---|---|---|---|
labelCol |
Double |
"label" |
预测标签 |
featuresCol |
Vector |
"features" |
特征标签 |
Param name |
Type(s) |
Example |
Description |
---|---|---|---|
paramMap |
ParamMap |
ParamMap(A.c -> b) |
将b的值赋给模型A的参数c |
paramMaps |
Array[ParamMa p] |
Array[ParamMa p](n) |
形成n个ParamMap模型参数列表 |
firstParamPair |
ParamPair |
ParamPair(A.c, b) |
将b的值赋给模型A的参数c |
otherParamPair s |
ParamPair |
ParamPair(A.e, f) |
将f的值赋给模型 A的参数e |
算法参数 |
---|
def setCheckpointInterval(value: Int): RandomForestClassifier.this.type def setFeatureSubsetStrategy(value: String): RandomForestClassifier.this.type def setFeaturesCol(value: String): RandomForestClassifier def setImpurity(value: String): RandomForestClassifier.this.type def setLabelCol(value: String): RandomForestClassifier def setMaxBins(value: Int): RandomForestClassifier.this.t def setMaxDepth(value: Int): RandomForestClassifier.this.type def setMinInfoGain(value: Double): RandomForestClassifier.this.type def setMinInstancesPerNode(value: Int): RandomForestClassifier.this.type def setNumTrees(value: Int): RandomForestClassifier.this.type def setPredictionCol(value: String): RandomForestClassifier def setProbabilityCol(value: String): RandomForestClassifier def setRawPredictionCol(value: String): RandomForestClassifier def setSeed(value: Long): RandomForestClassifier.this.type def setSubsamplingRate(value: Double): RandomForestClassifier.this.type def setThresholds(value: Array[Double]): RandomForestClassifier |
参数名称 |
spark conf参数名 |
参数含义 |
取值类型 |
---|---|---|---|
numTrainingD ataCopies |
spark.sophon.ml.rf. numTrainingDataC opies |
训练数据的副本数量 |
Int,缺省值为 1,必须大于等于1 |
broadcastVaria bles |
spark.sophon.ml.rf. broadcastVariables |
是否广播具有较大存储空间的变量 |
Boolean,缺省值为false |
numPartsPerTr ainingDataCop y |
spark.sophon.ml.rf. numPartsPerTraini ngDataCopy |
单个训练数据副本的分区数 |
Int,必须大于等于0,缺省值为0,0为不重新分区 |
binnedFeature sDataType |
spark.sophon.ml.rf. binnedFeaturesDat aType |
训练样本数据中特征的存储格式 |
String,可支持存储格式:array、 fasthashmap,缺省值为array |
参数及fit代码接口示例:
import org.apache.spark.ml.param.{ParamMap, ParamPair} val rf= new RandomForestClassifier()//定义随机森林分类器 //定义def fit(dataset: Dataset[_], paramMap: ParamMap) 接口参数 val paramMap = ParamMap(rf.maxDepth -> maxDepth).put(rf.numTrees, numTrees) // 定义def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): 接口参数 val paramMaps = new Array[ParamMap](2) for (i <- 0 to paramMaps.size) { paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth) .put(rf.numTrees, numTrees) }//对paramMaps进行赋值 // 定义def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) 接口参数 val firstParamPair= ParamPair(rf.maxDepth, maxDepth1) val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2) val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees) // 调用各个fit接口 model = rf.fit(trainingData) model = rf.fit(trainingData, paramMap) models = rf.fit(trainingData, paramMaps) model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
// spark: SparkSession spark.conf.set("spark.sophon.ml.rf.binnedFeaturesDataType", binnedFeaturesType) spark.conf.set("spark.sophon.ml.rf.numTrainingDataCopies", numCopiesInput) spark.conf.set("spark.sophon.ml.rf.numPartsPerTrainingDataCopy", pt) spark.conf.set("spark.sophon.ml.rf.broadcastVariables", bcVariables) val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
Param name |
Type(s) |
Default |
Description |
---|---|---|---|
predictionCol |
Double |
"prediction" |
Predicted label |
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier} import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer} // Load and parse the data file, converting it to a DataFrame. val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") // Index labels, adding metadata to the label column. // Fit on whole dataset to include all labels in index. val labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol("indexedLabel") .fit(data) // Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .setMaxCategories(4) .fit(data) // Split the data into training and test sets (30% held out for testing). val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) // Train a RandomForest model. val rf = new RandomForestClassifier() .setLabelCol("indexedLabel") .setFeaturesCol("indexedFeatures") .setNumTrees(10) // Convert indexed labels back to original labels. val labelConverter = new IndexToString() .setInputCol("prediction") .setOutputCol("predictedLabel") .setLabels(labelIndexer.labels) // Chain indexers and forest in a Pipeline. val pipeline = new Pipeline() .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter)) // Train model. This also runs the indexers. val model = pipeline.fit(trainingData) // Make predictions. val predictions = model.transform(testData) // Select example rows to display. predictions.select("predictedLabel", "label", "features").show(5) // Select (prediction, true label) and compute test error. val evaluator = new MulticlassClassificationEvaluator() .setLabelCol("indexedLabel") .setPredictionCol("prediction") .setMetricName("accuracy") val accuracy = evaluator.evaluate(predictions) println(s"Test Error = ${(1.0 - accuracy)}") val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel] println(s"Learned classification forest model:\n ${rfModel.toDebugString}")
+--------------+-----+--------------------+ |predictedLabel|label| features| +--------------+-----+--------------------+ | 1.0| 1.0|(47236,[270,439,5...| | 1.0| 1.0|(47236,[3023,6093...| | -1.0| -1.0|(47236,[270,391,4...| | -1.0| -1.0|(47236,[3718,3723...| | 1.0| 1.0|(47236,[729,760,1...| +--------------+-----+--------------------+ only showing top 5 rows Test Error = 0.06476632743800015
Param name |
Type(s) |
Default |
Description |
---|---|---|---|
labelCol |
Double |
"label" |
预测标签 |
featuresCol |
Vector |
"features" |
特征标签 |
Param name |
Type(s) |
Example |
Description |
---|---|---|---|
paramMap |
ParamMap |
ParamMap(A.c -> b) |
将b的值赋给模型A的参数c |
paramMaps |
Array[ParamMa p] |
Array[ParamMa p](n) |
形成n个ParamMap模型参数列表 |
firstParamPair |
ParamPair |
ParamPair(A.c, b) |
将b的值赋给模型A的参数c |
otherParamPair s |
ParamPair |
ParamPair(A.e, f) |
将f的值赋给模型A的参数e |
算法参数 |
---|
def setCheckpointInterval(value: Int): RandomForestRegressor.this.type def setFeatureSubsetStrategy(value: String): RandomForestRegressor.this.type def setFeaturesCol(value: String): RandomForestRegressor def setImpurity(value: String): RandomForestRegressor.this.type def setLabelCol(value: String): RandomForestRegressor def setMaxBins(value: Int): RandomForestRegressor.this.type def setMaxDepth(value: Int): RandomForestRegressor.this.type def setMinInfoGain(value: Double): RandomForestRegressor.this.type def setMinInstancesPerNode(value: Int): RandomForestRegressor.this.type def setNumTrees(value: Int): RandomForestRegressor.this.type def setPredictionCol(value: String): RandomForestRegressor def setSeed(value: Long): RandomForestRegressor.this.type def setSubsamplingRate(value: Double):RandomForestRegressor.this.type |
参数名称 |
spark conf参数名 |
参数含义 |
取值类型 |
---|---|---|---|
numTrainingD ataCopies |
spark.sophon.ml.rf. numTrainingDataC opies |
训练数据的副本数量 |
Int,缺省值为 1,必须大于等于1 |
broadcastVaria bles |
spark.sophon.ml.rf. broadcastVariables |
是否广播具有较大存储空间的变量 |
Boolean,缺省值为false |
numPartsPerTr ainingDataCop y |
spark.sophon.ml.rf. numPartsPerTraini ngDataCopy |
单个训练数据副本的分区数 |
Int,必须大于等于0,缺省值为0,0->不重新分区 |
binnedFeature sDataType |
spark.sophon.ml.rf. binnedFeaturesDat aType |
训练样本数据中特征的存储格式 |
String,取值范围:array或者fasthashmap,缺省值为array |
参数及fit代码接口示例:
import org.apache.spark.ml.param.{ParamMap, ParamPair} rf= new RandomForestRegressor() //定义回归模型 //定义def fit(dataset: Dataset[_], paramMap: ParamMap) 接口参数 val paramMap = ParamMap(rf.maxDepth -> maxDepth) .put(rf.numTrees, numTrees) // 定义def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): 接口参数 val paramMaps = new Array[ParamMap](2) for (i <- 0 to paramMaps.size) { paramMaps(i) = ParamMap(rf.maxDepth -> maxDepth) .put(rf.numTrees, numTrees) }//对paramMaps进行赋值 // 定义def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) 接口参数 val firstParamPair= ParamPair(rf.maxDepth, maxDepth1) val otherParamPairs_1st= ParamPair(rf.maxDepth, maxDepth2) val otherParamPairs_2nd= ParamPair(rf.numTrees, numTrees) // 调用各个fit接口 model = rf.fit(trainingData) model = rf.fit(trainingData, paramMap) models = rf.fit(trainingData, paramMaps) model = rf.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
// spark: SparkSession spark.conf.set("spark.sophon.ml.rf.binnedFeaturesDataType", binnedFeaturesType) spark.conf.set("spark.sophon.ml.rf.numTrainingDataCopies", numCopiesInput) spark.conf.set("spark.sophon.ml.rf.numPartsPerTrainingDataCopy", pt) spark.conf.set("spark.sophon.ml.rf.broadcastVariables", bcVariables) val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
Param name |
Type(s) |
Default |
Description |
---|---|---|---|
predictionCol |
Double |
"prediction" |
Predicted label |
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor} // Load and parse the data file, converting it to a DataFrame. val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") // Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .setMaxCategories(4) .fit(data) // Split the data into training and test sets (30% held out for testing). val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) // Train a RandomForest model. val rf = new RandomForestRegressor() .setLabelCol("label") .setFeaturesCol("indexedFeatures") // Chain indexer and forest in a Pipeline. val pipeline = new Pipeline() .setStages(Array(featureIndexer, rf)) // Train model. This also runs the indexer. val model = pipeline.fit(trainingData) // Make predictions. val predictions = model.transform(testData) // Select example rows to display. predictions.select("prediction", "label", "features").show(5) // Select (prediction, true label) and compute test error. val evaluator = new RegressionEvaluator() .setLabelCol("label") .setPredictionCol("prediction") .setMetricName("rmse") val rmse = evaluator.evaluate(predictions) println(s"Root Mean Squared Error (RMSE) on test data = $rmse") val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel] println(s"Learned regression forest model:\n ${rfModel.toDebugString}")
+----------+-----+--------------------+ |prediction|label| features| +----------+-----+--------------------+ | 0.51| 0.3|(1000,[0,1,2,3,4,...| +----------+-----+--------------------+ root mean squared error = 0.21000000000000002
接口适用性说明: