返回 [Spark MLlib入门教程](http://mocom.xmu.edu.cn/article/show/5858ab782b2730e00d70fa08/0/1)
## 一、方法简介
决策树(decision tree)是一种基本的分类与回归方法,这里主要介绍用于分类的决策树。决策树模式呈树形结构,其中每个内部节点表示一个属性上的测试,每个分支代表一个测试输出,每个叶节点代表一种类别。学习时利用训练数据,根据损失函数最小化的原则建立决策树模型;预测时,对新的数据,利用决策树模型进行分类。
## 二、基本原理
### (一)特征选择
首先定义信息论中广泛使用的一个度量标准——熵(entropy),它是表示随机变量不确定性的度量。熵越大,随机变量的不确定性就越大。而信息增益(informational entropy)表示得知某一特征后使得信息的不确定性减少的程度。简单的说,一个属性的信息增益就是由于使用这个属性分割样例而导致的期望熵降低。信息增益、信息增益比和基尼指数的具体定义如下:
**信息增益**:特征A对训练数据集D的信息增益$g(D,A)$,定义为集合D的经验熵$H(D)$与特征A给定条件下D的经验条件熵$H(D|A)$之差,即 $$ g(D,A)=H(D)-H(D|A) $$
**信息增益比**:特征A对训练数据集D的信息增益比$g_R(D,A)$定义为其信息增益$g(D,A)$与训练数据集D关于特征A的值的熵$H_A(D)$之比,即 $$ g_R(D,A)=\frac{g(D,A)}{H_A(D)}$$
其中,$HA(D) = - \sum_{i=1}^{n} \frac{\left|D_i\right|}{\left|D\right|}log_2\frac{\left|D_i\right|} {\left|D\right|}$,n是特征A取值的个数。
**基尼指数**:分类问题中,假设有K个类,样本点属于第K类的概率为$p_k$, 则概率分布的基尼指数定义为:
$$Gini(p) = \sum_1^{K} p_k(1 - p_k) $$ $$= 1-\sum_1^{K} p_k^2$$
### (二)决策树的生成
### (三)决策树的剪枝
决策树的剪枝往往通过极小化决策树整体的损失函数来实现。一般来说,损失函数可以进行如下的定义: $$ C_a(T)=C(T)+a\left|T\right| $$ 其中,T为任意子树,$C(T)$ 为对训练数据的预测误差(如基尼指数),$\left|T\right|$ 为子树的叶结点个数,$a\ge0$为参数,$C_a(T)$ 为参数是$a$时的子树T的整体损失,参数$a$权衡训练数据的拟合程度与模型的复杂度。对于固定的$a$,一定存在使损失函数$C_a(T)$ 最小的子树,将其表示为$T_a$ 。当$a$大的时候,最优子树$T_a$偏小;当$a$小的时候,最优子树$T_a$偏大。
### 示例代码
##### 1. 导入需要的包:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.{Pipeline,PipelineModel}
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer}
import org.apache.spark.mllib.linalg.{Vector,Vectors}
import org.apache.spark.sql.Row
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
##### 2. 读取数据,简要分析:
scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@10d83860
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> val observations=sc.textFile("G:/spark/iris.data").map(_.split(",")).map(p => Vectors.dense(p(2).toDouble, p(3).toDouble))
observations: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[14] at map at :37
这里用case class定义一个schema:Iris,Iris就是需要的数据的结构;然后读取数据,创建一个Iris模式的RDD,然后转化成dataframe;最后调用show()方法来查看一下部分数据。
scala> case class Iris(features: Vector, label: String)
defined class Iris
scala> val data = sc.textFile("G:/spark/iris.data")
| .map(_.split(","))
| .map(p => Iris(Vectors.dense(p(2).toDouble, p(3).toDouble), p(4).toString()))
| .toDF()
data: org.apache.spark.sql.DataFrame = [features: vector, label: string]
scala> data.show()
| features| label|
only showing top 20 rows
##### 3. 构建ML的pipeline
scala> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data)
labelIndexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_6033e13b0b2b
scala> val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(data)
featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_b5a8adea6903
scala> val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
trainingData: org.apache.spark.sql.DataFrame = [features: vector, label: string]
testData: org.apache.spark.sql.DataFrame = [features: vector, label: string]
然后,设置决策树的参数。这里统一用setter的方法来设置,也可以用ParamMap来设置(具体的可以查看spark mllib的官网)。这里设置了用gini指数来进行特征选择,设置树的最大深度为5,具体的可以设置的参数可以通过explainParams()来获取,还能看已经设置的参数的结果。
scala> val dt = new DecisionTreeClassifier()
| .setLabelCol("indexedLabel")
| .setFeaturesCol("indexedFeatures")
| .setImpurity("gini")
| .setMaxDepth(5)
dt: org.apache.spark.ml.classification.DecisionTreeClassifier = dtc_16842f2bb6a7
scala> println("DecisionTreeClassifier parameters:\n" + dt.explainParams() + "\n")
DecisionTreeClassifier parameters:
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: false)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations (default:10)
featuresCol: features column name (default: features, current: indexedFeatures)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini, current: gini)
labelCol: label column name (default: label, current: indexedLabel)
maxBins: Max number of bins for discretizing continuous features. Must be >=2 and >= number of categories for any categorical feature. (default: 32)
maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5, current: 5)
maxMemoryInMB: Maximum memory in MB allocated to histogram aggregation. (default: 256)
minInfoGain: Minimum information gain for a split to be considered at a tree node. (default: 0.0)
minInstancesPerNode: Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1. (default: 1)
predictionCol: prediction column name (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities (default: probability)
rawPredictionCol: raw prediction (a.k.a. confidence) column name (default: rawPrediction)
seed: random seed (default: 159147643)
thresholds: Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold. (undefined)
scala> val labelConverter = new IndexToString().
| setInputCol("prediction").
| setOutputCol("predictedLabel").
| setLabels(labelIndexer.labels)
labelConverter: org.apache.spark.ml.feature.IndexToString = idxToStr_5c051ba9ebd3
scala> val pipeline = new Pipeline().
| setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
pipeline: org.apache.spark.ml.Pipeline = pipeline_eb9649610b0f
// Fit the pipeline to training documents.
scala> val model = pipeline.fit(trainingData)
model: org.apache.spark.ml.PipelineModel = pipeline_eb9649610b0f
scala> val predictions = model.transform(testData)
predictions: org.apache.spark.sql.DataFrame = [features: vector, label: string,
indexedLabel: double, indexedFeatures: vector, rawPrediction: vector, probabilit
y: vector, prediction: double, predictedLabel: string]
scala> predictions.
| select("predictedLabel", "label", "features").
| collect().
| foreach { case Row(predictedLabel: String, label: String, features:
Vector) =>
| println(s"($label, $features) --> predictedLabel=$predictedLabel")
| }
(Iris-setosa, [1.2,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.3,0.3]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.3,0.4]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.4,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.4,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.4,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.5,0.1]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.5,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.6,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.6,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.6,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.7,0.2]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.7,0.4]) --> predictedLabel=Iris-setosa
(Iris-setosa, [1.9,0.2]) --> predictedLabel=Iris-setosa
(Iris-versicolor, [4.0,1.0]) --> predictedLabel=Iris-versicolor
... ...
##### 4. 模型评估
scala> val evaluator = new MulticlassClassificationEvaluator().
| setLabelCol("indexedLabel").
| setPredictionCol("prediction")
evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mc Eval_4651752eb9e9
scala> val accuracy = evaluator.evaluate(predictions)
accuracy: Double = 0.96
scala> println("Test Error = " + (1.0 - accuracy))
Test Error = 0.040000000000000036
scala> val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
treeModel: org.apache.spark.ml.classification.DecisionTreeClassificationModel =
DecisionTreeClassificationModel (uid=dtc_16842f2bb6a7) of depth 4 with 13 nodes
scala> println("Learned classification tree model:\n" + treeModel.toDebugString)
Learned classification tree model:
DecisionTreeClassificationModel (uid=dtc_16842f2bb6a7) of depth 4 with 13 nodes
If (feature 0 <= 1.9)
Predict: 2.0
Else (feature 0 > 1.9)
If (feature 1 <= 1.7)
If (feature 0 <= 4.9)
If (feature 1 <= 1.6)
Predict: 0.0
Else (feature 1 > 1.6)
Predict: 1.0
Else (feature 0 > 4.9)
If (feature 1 <= 1.6)
Predict: 1.0
Else (feature 1 > 1.6)
Predict: 0.0
Else (feature 1 > 1.7)
If (feature 0 <= 4.8)
Predict: 0.0
Else (feature 0 > 4.8)
Predict: 1.0
自动标签 :