返回 [Spark MLlib入门教程](http://mocom.xmu.edu.cn/article/show/5858ab782b2730e00d70fa08/0/1)
## 一、方法简介
协同过滤是一种基于一组兴趣相同的用户或项目进行的推荐,它根据邻居用户(与目标用户兴趣相似的用户)的偏好信息产生对目标用户的推荐列表。关于协同过滤的一个经典的例子就是看电影。如果你不知道哪一部电影是自己喜欢的或者评分比较高的,那么通常的做法就是问问周围的朋友,看看最近有什么好的电影推荐。而在问的时候,肯定都习惯于问跟自己口味差不多的朋友,这就是协同过滤的核心思想。因此,协同过滤是在海量数据中挖掘出小部分与你品味类似的用户,在协同过滤中,这些用户成为邻居,然后根据他们喜欢的东西组织成一个排序的目录推荐给你(如下图所示)。
![基于用户的协同过滤推荐机制的基本原理](http://dblab.xmu.edu.cn/blog/wp-content/uploads/2017/01/user-item.png "")
协同过滤算法主要分为基于用户的协同过滤算法和基于项目的协同过滤算法。MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素。Spark MLlib实现了 [交替最小二乘法](http://dl.acm.org/citation.cfm?id=1608614) (ALS) 来学习这些隐性语义因子。
## 二、隐性反馈 vs 显性反馈
显性反馈行为包括用户明确表示对物品喜好的行为,隐性反馈行为指的是那些不能明确反应用户喜好的行为。在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈,例如页面游览,点击,购买,喜欢,分享等等。
基于矩阵分解的协同过滤的标准方法,一般将用户商品矩阵中的元素作为用户对商品的显性偏好。在 MLlib 中所用到的处理这种数据的方法来源于文献: [Collaborative Filtering for Implicit Feedback Datasets](http://dx.doi.org/10.1109/ICDM.2008.22) 。 本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分,而是与所观察到的用户偏好强度关联起来。然后,这个模型将尝试找到隐语义因子来预估一个用户对一个商品的偏好。
## 三、示例
下面代码读取spark的示例文件,文件中每一行包括一个用户id、商品id和评分。我们使用默认的`ALS.train()` 方法来构建推荐模型并评估模型的均方差。
### 1. 导入需要的包:
```scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating
```
### 2. 读取数据:
首先,读取文本文件,把数据转化成rating类型,即[Int, Int, Double]的RDD;
```scala
scala> val data = sc.textFile("../data/mllib/als/test.data")
data: org.apache.spark.rdd.RDD[String] = ../data/mllib/als/test.data MapPartitio
nsRDD[1] at textFile at :21
scala> val ratings = data.map(_.split(',') match { case Array(user, item, rate)
=> Rating(user.toInt, item.toInt, rate.toDouble)})
ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating]
= MapPartitionsRDD[2] at map at :26
```
可以把数据打印出来看一下:
```scala
scala> ratings.foreach{x => println(x)}
Rating(1,1,5.0)
Rating(3,2,5.0)
Rating(1,2,1.0)
Rating(3,3,1.0)
Rating(1,3,5.0)
Rating(3,4,5.0)
Rating(1,4,1.0)
Rating(4,1,1.0)
Rating(2,1,5.0)
Rating(4,2,5.0)
Rating(2,2,1.0)
Rating(4,3,1.0)
Rating(2,3,5.0)
Rating(4,4,5.0)
Rating(2,4,1.0)
Rating(3,1,1.0)
```
其中Rating中的第一个int是user编号,第二个int是item编号,最后的double是user对item的评分。
### 3. 构建模型
划分训练集和测试集,比例分别是0.8和0.2。
```scala
scala> val splits = ratings.randomSplit(Array(0.8, 0.2))
splits: Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rati
ng]] = Array(MapPartitionsRDD[5] at randomSplit at :30, MapPartitionsRD
D[6] at randomSplit at :30)
scala> val training = splits(0)
training: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating]
= MapPartitionsRDD[5] at randomSplit at :30
scala> val test = splits(1)
test: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = M
apPartitionsRDD[6] at randomSplit at :30
```
指定参数值,然后使用ALS训练数据建立推荐模型:
```scala
scala> val rank = 10
rank: Int = 10
scala> val numIterations = 10
numIterations: Int = 10
scala> val model = ALS.train(training, rank, numIterations, 0.01)
model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apac
he.spark.mllib.recommendation.MatrixFactorizationModel@1f14d4a
```
在 MLlib 中的实现有如下的参数:
- `numBlocks` 是用于并行化计算的分块个数 (设置为-1,为自动配置)。
- `rank` 是模型中隐语义因子的个数。
- `iterations` 是迭代的次数。
- `lambda` 是ALS的正则化参数。
- `implicitPrefs` 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本。
- `alpha` 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准。
可以调整这些参数,不断优化结果,使均方差变小。比如:iterations越多,lambda较小,均方差会较小,推荐结果较优。上面的例子中调用了 ALS.train(ratings, rank, numIterations, 0.01) ,我们还可以设置其他参数,调用方式如下:
```scala
val model = new ALS()
.setRank(params.rank)
.setIterations(params.numIterations)
.setLambda(params.lambda)
.setImplicitPrefs(params.implicitPrefs)
.setUserBlocks(params.numUserBlocks)
.setProductBlocks(params.numProductBlocks)
.run(training)
```
### 4. 利用模型进行预测
从 test训练集中获得只包含用户和商品的数据集 :
```scala
scala> val testUsersProducts = test.map { case Rating(user, product, rate) =>
| (user, product)
| }
usersProducts: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3868] at
map at :34
```
使用训练好的推荐模型对用户商品进行预测评分,得到预测评分的数据集:
```scala
scala> val predictions =
| model.predict(testUsersProducts).map { case Rating(user, product, rate)
| =>((user, product), rate)
| }
predictions: org.apache.spark.rdd.RDD[((Int, Int), Double)] = MapPartitionsRDD[3
877] at map at :45
```
将真实评分数据集与预测评分数据集进行合并。这里,Join操作类似于SQL的inner join操作,返回结果是前面和后面集合中配对成功的,过滤掉关联不上的。
```scala
scala> val ratesAndPreds = test.map { case Rating(user, product, rate) =>
| ((user, product), rate)
| }.join(predictions)
ratesAndPreds: org.apache.spark.rdd.RDD[((Int, Int), (Double, Double))] = MapPar
titionsRDD[3881] at join at :48
```
我们把结果输出,对比一下真实结果与预测结果:
```scala
scala> ratesAndPreds.foreach(println)
((3,1),(1.0,-0.22756397347958202))
((4,2),(5.0,4.388061223429636))
((4,1),(1.0,-0.1847678805249373))
```
比如,第一条结果记录((3,1),(1.0,-0.22756397347958202))中,(3,1)分别表示3号用户和1号商品,而1.0是实际的估计分值,-0.22756397347958202是经过推荐的预测分值。
然后计算均方差,这里的r1就是真实结果,r2就是预测结果:
```scala
scala> val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
| val err = (r1 - r2)
| err * err
| }.mean()
MSE: Double = 1.0950191019929887
```
打印出均方差值 :
```scala
scala> println("Mean Squared Error = " + MSE)
Mean Squared Error = 1.0950191019929887
```
我们可以看到打分的均方差值为1.09左右。由于本例的数据量很少,预测的结果和实际相比有一定的差距。上面的例子只是对测试集进行了评分,我们还可以进一步的通过调用model.recommendProducts给特定的用户推荐商品以及model.recommendUsers来给特定商品推荐潜在用户。