返回 [Spark MLlib入门教程](http://mocom.xmu.edu.cn/article/show/5858ab782b2730e00d70fa08/0/1) --- ## 工作流(ML Pipelines)例子 本节以逻辑斯蒂回归为例,构建一个典型的机器学习过程,来具体介绍一下工作流是如何应用的。我们的目的是查找出所有包含"spark"的句子,即将包含"spark"的句子的标签设为1,没有"spark"的句子的标签设为0。 首先,导入工作流和逻辑斯蒂回归所需要的包。 ```scala import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.mllib.linalg.Vector import org.apache.spark.sql.Row ``` 接下来,根据SparkContext来创建一个SQLContext,其中sc是一个已经存在的SparkContext;然后导入sqlContext.implicits._来实现RDD到Dataframe的隐式转换。 ```scala scala> val sqlContext = new SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@129bfa97 scala> import sqlContext.implicits._ import sqlContext.implicits._ ``` 然后,我们构建训练数据集。 ```scala scala> val training = sqlContext.createDataFrame(Seq( | (0L, "a b c d e spark", 1.0), | (1L, "b d", 0.0), | (2L, "spark f g h", 1.0), | (3L, "hadoop mapreduce", 0.0) | )).toDF("id", "text", "label") training: org.apache.spark.sql.DataFrame = [id: bigint, text: string, label: double] ``` 在这一步中我们要定义 Pipeline 中的各个工作流阶段PipelineStage,包括转换器和评估器,具体的,包含tokenizer, hashingTF和lr三个步骤。 ```scala scala> val tokenizer = new Tokenizer(). | setInputCol("text"). | setOutputCol("words") tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_5151ed4fa43e scala> val hashingTF = new HashingTF(). | setNumFeatures(1000). | setInputCol(tokenizer.getOutputCol). | setOutputCol("features") hashingTF: org.apache.spark.ml.feature.HashingTF = hashingTF_332f74b21ecb scala> val lr = new LogisticRegression(). | setMaxIter(10). | setRegParam(0.01) lr: org.apache.spark.ml.classification.LogisticRegression = logreg_28a670ae952f ``` 有了这些处理特定问题的转换器和评估器,接下来就可以按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。 ```scala scala> val pipeline = new Pipeline(). | setStages(Array(tokenizer, hashingTF, lr)) pipeline: org.apache.spark.ml.Pipeline = pipeline_4dabd24db001 ``` 现在构建的Pipeline本质上是一个Estimator,在它的fit()方法运行之后,它将产生一个PipelineModel,它是一个Transformer。 ```scala scala> val model = pipeline.fit(training) model: org.apache.spark.ml.PipelineModel = pipeline_4dabd24db001 ``` 我们可以看到,model的类型是一个PipelineModel,这个管道模型将在测试数据的时候使用。所以接下来,我们先构建测试数据。 ```scala scala> val test = sqlContext.createDataFrame(Seq( | (4L, "spark i j k"), | (5L, "l m n"), | (6L, "spark a"), | (7L, "apache hadoop") | )).toDF("id", "text") test: org.apache.spark.sql.DataFrame = [id: bigint, text: string] ``` 然后,我们调用我们训练好的PipelineModel的transform()方法,让测试数据按顺序通过拟合的工作流,生成我们所需要的预测结果。 ```scala scala> model.transform(test). | select("id", "text", "probability", "prediction"). | collect(). | foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => | println(s"($id, $text) --> prob=$prob, prediction=$prediction") | } (4, spark i j k) --> prob=[0.5406433544851421,0.45935664551485783], prediction=0.0 (5, l m n) --> prob=[0.9334382627383259,0.06656173726167405], prediction=0.0 (6, spark a) --> prob=[0.15041430048068286,0.8495856995193171], prediction=1.0 (7, apache hadoop) --> prob=[0.9768636139518304,0.023136386048169585], prediction=0.0 ``` 通过上述结果,我们可以看到,第4句和第6句中都包含"spark",其中第六句的预测是1,与我们希望的相符;而第4句虽然预测的依然是0,但是通过概率我们可以看到,第4句有46%的概率预测是1,而第5句、第7句分别只有7%和2%的概率预测为1,这是由于训练数据集较少,如果有更多的测试数据进行学习,预测的准确率将会有显著提升。
自动标签 : spark.ml 构建 机器 工作流 Spark 测试数据 Pipeline Tokenizer 预测 text PipelineModel 包含 HashingTF SQLContext
更多 [ 技术 ] 文章