鲲鹏社区首页
中文
注册
我要评分
文档获取效率
文档正确性
内容完整性
文档易理解
在线提单
论坛求助

DecisionTree

DecisionTree分为ML Classification API和ML Regression API两大类模型接口。

模型接口类别

函数接口

ML Classification API

def fit(dataset: Dataset[_]): DecisionTreeClassificationModel

def fit(dataset: Dataset[_], paramMaps:

Array[ParamMap]):

Seq[DecisionTreeClassificationModel]

def fit(dataset: Dataset[_], paramMap: ParamMap): DecisionTreeClassificationModel

def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): DecisionTreeClassificationModel

ML Regression API

def fit(dataset: Dataset[_]):

DecisionTreeRegressionModel

def fit(dataset: Dataset[_], paramMaps:

Array[ParamMap]): Seq[DecisionTreeRegressionModel]

def fit(dataset: Dataset[_], paramMap: ParamMap): DecisionTreeRegressionModel

def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): DecisionTreeRegressionModel

ML Classification API

  • 功能描述

    传入Dataset格式的样本数据,调用训练接口,输出决策树分类模型。

  • 输入输出
    1. 包名:package org.apache.spark.ml.classification
    2. 类名:DecisionTreeClassifier
    3. 方法名:fit
    4. 输入:Dataset[_],训练样本数据,必须字段如下。

      Param name

      Type(s)

      Default

      Description

      labelCol

      Double

      "label"

      Label to predict

      featuresCol

      Vector

      "features"

      特征标签

    5. 算法参数

      算法参数

      def setCheckpointInterval(value: Int): DecisionTreeClassifier.this.type

      Specifies how often to checkpoint the cached node IDs.

      def setFeaturesCol(value: String): DecisionTreeClassifier

      def setImpurity(value: String): DecisionTreeClassifier.this.type

      def setLabelCol(value: String): DecisionTreeClassifier

      def setMaxBins(value: Int):DecisionTreeClassifier.this.type

      def setMaxDepth(value: Int): DecisionTreeClassifier.this.type

      def setMinInfoGain(value: Double): DecisionTreeClassifier.this.type

      def setMinInstancesPerNode(value: Int):DecisionTreeClassifier.this.type

      def setPredictionCol(value: String): DecisionTreeClassifier

      def setProbabilityCol(value: String): DecisionTreeClassifier

      def setRawPredictionCol(value: String): DecisionTreeClassifier

      def setSeed(value: Long): DecisionTreeClassifier.this.type

      def setThresholds(value: Array[Double]): DecisionTreeClassifier

    6. 新增算法参数。

      参数名称

      参数含义

      取值类型

      numTrainingDataCopi es

      训练数据的副本数量

      Int,缺省值为1,必须大于等于1

      broadcastVariables

      是否广播具有较大存储空间的变量

      Boolean,缺省值为false

      numPartsPerTrainingD ataCopy

      单个训练数据副本的分区数

      Int,必须大于等于0,缺省值为0,0->不重新分区

      binnedFeaturesDataTy pe

      训练样本数据中特征的存储格式

      String,取值范围: array或者fasthashmap,缺省值为array

      copyStrategy

      副本分配策略的选择

      String,取值范围: normal或plus,缺省值为normal

      numFeaturesOptFindS plits

      启动高维特征切分点搜索优化的维度阈值

      Int,缺省值为8196

      参数及fit代码接口示例:

      import org.apache.spark.ml.param.{ParamMap, ParamPair}
      
      val dt= new DecisionTreeClassifier()//定义
      
      //定义def fit(dataset: Dataset[_], paramMap: ParamMap) 接口参数
      val paramMap = ParamMap(dt.maxDepth -> maxDepth).put(dt.maxBins, maxBins)
      
      // 定义def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): 接口参数
      val paramMaps = new Array[ParamMap](2)
      for (i <- 0 to  paramMaps.size) {
      paramMaps(i) = ParamMap(dt.maxDepth -> maxDepth)
      .put(dt.maxBins, maxBins)
      }//对paramMaps进行赋值
      
      // 定义def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) 接口参数
      val firstParamPair= ParamPair(dt.maxDepth, maxDepth1)
      val otherParamPairs_1st= ParamPair(dt.maxDepth, maxDepth2)
      val otherParamPairs_2nd= ParamPair(dt.maxBins, maxBins)
      
      // 调用各个fit接口
      model = dt.fit(trainingData)
      model = dt.fit(trainingData, paramMap)
      models = dt.fit(trainingData, paramMaps)
      model = dt.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
    7. 输出:DecisionTreeClassificationModel,决策树分类模型,模型预测时的输出字段如下。

      Param name

      Type(s)

      Default

      Description

      predictionCol

      Double

      "prediction"

      predictionCol

      rawPredictionCo l

      Vector

      "rawPrediction"

      Vector of length # classes, with the counts of training instance labels at the tree node which makes the prediction

      probabilityCol

      Vector

      "probability"

      Vector of length # classes equal to

      rawPrediction normalized to a multinomial distribution

  • 使用样例
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.classification.DecisionTreeClassificationModel
    import org.apache.spark.ml.classification.DecisionTreeClassifier
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
    
    // Load the data stored in LIBSVM format as a DataFrame.val
    val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    
    // Index labels, adding metadata to the label column.
    // Fit on whole dataset to include all labels in index.
    val labelIndexer = new StringIndexer()
    .setInputCol("label")
    .setOutputCol("indexedLabel")
    .fit(data)
    // Automatically identify categorical features, and index them.
    val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
    .fit(data)
    
    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
    
    // Train a DecisionTree model.
    val dt = new DecisionTreeClassifier()
    .setLabelCol("indexedLabel")
    .setFeaturesCol("indexedFeatures")
    
    // Convert indexed labels back to original labels.
    val labelConverter = new IndexToString()
    .setInputCol("prediction")
    .setOutputCol("predictedLabel")
    .setLabels(labelIndexer.labels)
    
    // Chain indexers and tree in a Pipeline.
    val pipeline = new Pipeline()
    .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
    
    // Train model. This also runs the indexers.
    val model = pipeline.fit(trainingData)
    
    // Make predictions.
    val predictions = model.transform(testData)
    
    // Select example rows to display.
    predictions.select("predictedLabel", "label", "features").show(5)
    
    // Select (prediction, true label) and compute test error.
    val evaluator = new MulticlassClassificationEvaluator()
    .setLabelCol("indexedLabel")
    .setPredictionCol("prediction")
    .setMetricName("accuracy")
    val accuracy = evaluator.evaluate(predictions)
    println(s"Test Error = ${(1.0 - accuracy)}")
    val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
    println(s"Learned classification tree model:\n ${treeModel.toDebugString}")
  • 结果样例
    +--------------+-----+--------------------+
    |predictedLabel|label|            features|
    +--------------+-----+--------------------+
    |           1.0|  1.0|(47236,[270,439,5...|
    |           1.0|  1.0|(47236,[3023,6093...|
    |          -1.0| -1.0|(47236,[270,391,4...|
    |          -1.0| -1.0|(47236,[3718,3723...|
    |           1.0|  1.0|(47236,[729,760,1...|
    +--------------+-----+--------------------+
    only showing top 5 rows
    
    Test Error = 0.06476632743800015

ML Regression API

  • 功能描述

    传入Dataset格式的样本数据,调用训练接口,输出决策树分类模型。

  • 输入输出
    1. 包名:package org.apache.spark.ml.regression
    2. 类名:DecisionTreeClassifier
    3. 方法名:fit
    4. 输入:Dataset[_],训练样本数据,必须字段如下。

      Param name

      Type(s)

      Default

      Description

      labelCol

      Double

      "label"

      Label to predict

      featuresCol

      Vector

      "features"

      特征标签

    5. 算法参数

      算法参数

      def setCheckpointInterval(value: Int): DecisionTreeRegressor.this.type

      Specifies how often to checkpoint the cached node IDs.

      def setFeaturesCol(value: String): DecisionTreeRegressor

      def setImpurity(value: String): DecisionTreeRegressor.this.type

      def setLabelCol(value: String): DecisionTreeRegressor

      def setMaxBins(value: Int): DecisionTreeRegressor.this.type

      def setMaxDepth(value: Int): DecisionTreeRegressor.this.type

      def setMinInfoGain(value: Double): DecisionTreeRegressor.this.type

      def setMinInstancesPerNode(value: Int): DecisionTreeRegressor.this.type

      def setPredictionCol(value: String): DecisionTreeRegressor

      def setSeed(value: Long): DecisionTreeRegressor.this.type

      def setVarianceCol(value: String): DecisionTreeRegressor.this.type

    6. 新增算法参数

      参数名称

      参数含义

      取值类型

      numTrainingDataCopi es

      训练数据的副本数量

      Int,缺省值为1,必须大于等于1

      broadcastVariables

      是否广播具有较大存储空间的变量

      Boolean,缺省值为false

      numPartsPerTrainingD ataCopy

      单个训练数据副本的分区数

      Int,必须大于等于0,缺省值为0,0->不重新分区

      binnedFeaturesDataTy pe

      训练样本数据中特征的存储格式

      String,取值范围: array或者fasthashmap,缺省值为array

      copyStrategy

      副本分配策略的选择

      String,取值范围: normal或plus,缺省值为normal

      numFeaturesOptFindS plits

      启动高维特征切分点搜索优化的维度阈值

      Int,缺省值为8196

      参数及fit代码接口示例:

      import org.apache.spark.ml.param.{ParamMap, ParamPair}
      
      val rf= new DecisionTreeClassifier()//定义
      
      //定义def fit(dataset: Dataset[_], paramMap: ParamMap) 接口参数
      val paramMap = ParamMap(dt.maxDepth -> maxDepth).put(dt.maxBins, maxBins)
      
      // 定义def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): 接口参数
      val paramMaps = new Array[ParamMap](2)
      for (i <- 0 to  paramMaps.size) {
      paramMaps(i) = ParamMap(dt.maxDepth -> maxDepth)
      .put(dt.maxBins, maxBins)
      }//对paramMaps进行赋值
      
      // 定义def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*) 接口参数
      val firstParamPair= ParamPair(dt.maxDepth, maxDepth1)
      val otherParamPairs_1st= ParamPair(dt.maxDepth, maxDepth2)
      val otherParamPairs_2nd= ParamPair(dt.maxBins, maxBins)
      
      // 调用各个fit接口
      model = dt.fit(trainingData)
      model = dt.fit(trainingData, paramMap)
      models = dt.fit(trainingData, paramMaps)
      model = dt.fit(trainingData, firstParamPair, otherParamPairs_1st, otherParamPairs_2nd)
    7. 输出:DecisionTreeRegressionModel,决策树回归模型,模型预测时的输出字段如下。

      Param name

      Type(s)

      Default

      Description

      predictionCol

      Double

      "prediction"

      Predicted label

      varianceCol

      Double

      -

      The biased sample variance of prediction

  • 使用样例
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.evaluation.RegressionEvaluator
    import org.apache.spark.ml.feature.VectorIndexer
    import org.apache.spark.ml.regression.DecisionTreeRegressionModel
    import org.apache.spark.ml.regression.DecisionTreeRegressor
    
    // Load the data stored in LIBSVM format as a DataFrame.
    val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    
    // Automatically identify categorical features, and index them.
    // Here, we treat features with > 4 distinct values as continuous.
    val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)
    .fit(data)
    
    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
    
    // Train a DecisionTree model.
    val dt = new DecisionTreeRegressor()
    .setLabelCol("label")
    .setFeaturesCol("indexedFeatures")
    
    // Chain indexer and tree in a Pipeline.
    val pipeline = new Pipeline()
    .setStages(Array(featureIndexer, dt))
    
    // Train model. This also runs the indexer.
    val model = pipeline.fit(trainingData)
    
    // Make predictions.
    val predictions = model.transform(testData)
    
    // Select example rows to display.
    predictions.select("prediction", "label", "features").show(5)
    
    // Select (prediction, true label) and compute test error.
    val evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse")
    
    val rmse = evaluator.evaluate(predictions)
    println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
    
    val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
    println(s"Learned regression tree model:\n ${treeModel.toDebugString}")
  • 结果样例
    +----------+-----+--------------------+
    |prediction|label|            features|
    +----------+-----+--------------------+
    |      0.51|  0.3|(1000,[0,1,2,3,4,...|
    +----------+-----+--------------------+
    
    Root Mean Squared Error (RMSE) on test data = 0.21000000000000002