推荐系统中的余弦相似度计算

介绍相似度计算的方法,着重于spark编程方面。。。

背景介绍:

在推荐系统中,基于内容的推荐方法和协同过滤方法都会用到相似度计算。最简单的就是计算行(用户)或列(项)的余弦或其他相似性,并推荐k个用户最喜欢的最近邻项。

在Word2vector中,文本内容的处理被简化成k维向量空间中的向量运算,其向量之间的相似度可以用来表示文本语义的相似度。

由此可知,相似度度量是一个很重要并且很基础的内容,因此本文介绍一下,spark中计算相似度的若干种方法。

我们假设已经得到了向量或者矩阵,本文主要讨论如何使用spark来计算他们之间的相似度。(作为一个编程 slow learner,好不容易把Python用顺手了,又让我学Scala,想哭:sob:).

数据准备:

本文开始使用一个简单的例子,使用GitHub上一个中文词向量项目中预训练好的词向量作为我们的实验数据。Chinese Word Vector.这个项目提供了100多个不同形式的(密集和稀疏),上下文特征(单词,ngram,字符等)和语料库训练的中文单词向量(嵌入)。可以轻松获得具有不同属性的预先训练的向量,并将它们用于其他任务。

格式:

预训练的向量文件是txt格式,第一行表示元数据信息:第一个数字表示本文件的单词个数,第二个表示向量维数。

我们以微博语料库训练的、context feature为Word+Ngram的向量包sgns.weibo.bigram-char为例:

1
191321 300

表示本文件的词数量以及向量维度为300维。

取某向量示意:

1
我们 0.838326 0.401992 0.312874…… 0.886258 [total 300-d]

每行数据以空格" "分隔。

代码:

我们读入该词向量包,在spark中自动存成RDD数据格式。

假设我们取”春”、“夏”,”秋”,”冬”四个词:

1
2
3
4
5
6
7
8
9
10
//环境
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setAppName("CosineSimilarity")
val sc = new SparkContext(conf)

//读取数据Load and parse the data file.
val rows = sc.textFile("sgns.weibo.bigram-char").map{line => {val values = line.split(" ");val key = values(0);val word = values.drop(1).map(_.toDouble);(key,word)}}.filter(x => x._1.equals("春")|| x._1.equals("夏") || x._1.equals("秋") || x._1.equals("冬"))
//rows: org.apache.spark.rdd.RDD[(String, Array[Double])] = MapPartitionsRDD[3] at filter at <console>:25
rows.take(1).foreach(println)
//(春,[D@ce8b59e)

My approach:使用breeze库计算

breeze 作为一个scala 线性代数库 ,spark mlib机器学习使用了其中实现 ,最重要的是它相当于python的numpy 使用方法也很类似,这很重要。具体的breeze的使用见:saprk向量矩阵的使用

step1:数据转换

首先,我们将之前读入的RDD数据要转换为breeze中向量或者矩阵的形式。

尝试1:

将读入的RDD数据直接转换为breeze vector

1
2
3
4
5
6
7
8
val dm = rows.map(x => DenseVector(x._2))
//dm: org.apache.spark.rdd.RDD[breeze.linalg.DenseVector[Double]] = MapPartitionsRDD[61] at map at <console>:46
//可见这仍然是一个RDD数据,只是每条数据变成了一个breeze的densevector格式

//或者
val dm = rows.map(x => DenseMatrix(x._2))
//dm: org.apache.spark.rdd.RDD[breeze.linalg.DenseMatrix[Double]] = MapPartitionsRDD[60] at map at <console>:46
//和上面同理,RDD[DenseMatrix]格式

因为是RDD格式,是没有办法进行各种breeze向量或者矩阵的操作的

1
2
dm.t
// error: could not find implicit value for parameter op: breeze.linalg.support.CanTranspose[org.apache.spark.rdd.RDD[breeze.linalg.DenseMatrix[Double]]

即便每条转换成向量,因为还是RDD数据,也不行:

1
2
3
4
5
6
7
8
val v2 = DenseVector(denserows.filter(x => x._1.equals("夏")))
//v2: breeze.linalg.DenseVector[org.apache.spark.rdd.RDD[(String, Array[Double])]] = DenseVector(MapPartitionsRDD[67] at filter at <console>:46)

val v2 = DenseVector(denserows.filter(x => x._1.equals("夏")))
//v2: breeze.linalg.DenseVector[org.apache.spark.rdd.RDD[(String, Array[Double])]] = DenseVector(MapPartitionsRDD[68] at filter at <console>:46)

v1.dot(v2)
//<console>:49: error: diverging implicit expansion for type breeze.linalg.operators.OpMulInner.Impl2[breeze.linalg.DenseVector[org.apache.spark.rdd.RDD[(String, Arr ay[Double])]],breeze.linalg.DenseVector[org.apache.spark.rdd.RDD[(String, Array[Double])]],That]

尝试2:

RDD->Array->DenseMatrix

原始读进去的数据rowsrdd.RDD[(String, Array[Double])]格式。

  • 首先将其转换成Array[Array[Double]]格式,并读取行列数,此时一个二维数组,在铺平成一维数组。
1
2
3
4
5
6
7
8
9
10
11
val Array_rows = rows.map(x => x._2).collect()
//Array_rows: Array[Array[Double]] = Array(Array(0.729903, 1.011596, 0.11623, 0.671848, -0.413983, 0.38072, 0.055408, 0.293588,, 0.186168, ...

val row = Array_rows.length
//row: Int = 4

val col = Array_rows(0).length
//col: Int = 300

val flatten = Array_rows.flatten
//flatten: Array[Double] = Array(0.729903, 1.011596, 0.11623, 0.671848, -0.413983, 0.38072, 0.055408, 0.293588, 0.148698, 0.231511, -0.084237, -0.101489, 0.656804, , -0.456365, 0.906...
  • 从一维数组创建Breeze矩阵:
1
2
3
4
5
6
7
8
import breeze.linalg._
//import breeze.linalg._
val mat = new DenseMatrix(col,row,flatten).t
//mat: breeze.linalg.DenseMatrix[Double] =
//0.729903 1.011596 0.11623 0.671848 -0.413983 0.38072 ... (300 total)
//0.5044 0.560648 0.111501 0.105221 -0.223379 -0.046827 ...
//0.835371 1.232148 0.080021 0.206688 -0.340422 -0.084726 ...
//0.482624 0.497307 0.508875 -0.241568 -0.61349 0.132329 ...

注意:因为平铺数组(按行)与创建二维数组(按列)的方式正好相反,因此在创建二维矩阵时需要交换行数和列数,并在最后把矩阵转置即可。不可以在创建的时候交换行列数哦

step2:建立相似度算法计算公式

尝试1:

有了breeze矩阵之后,就可以对其进行索引操作:

1
2
3
//取第一行,即"春"的word vector
mat(0,::)
//res4: breeze.linalg.Transpose[breeze.linalg.DenseVector[Double]] = Transpose(DenseVector(0.729903, 1.011596, 0.11623, 0.671848, -0.413983, 0.38072, 0.055408, 0.293588, 0.148698, 0.231511, -0.084237, -0.101489, 0.656804, 0.081559, -0.40669, 0.019526, -0.078754, -0.536043, -0.367921, 0.852284, 0.306367, -0.322883, -0.758667, 0.096341, -1.008443, 0.047269, 0.078617, 0.528206, 0.586177, 0.331057, -0.585402, -0.626427, -0.337651, 0.222883, 0.251367, 0.363349, 0.374307, -0.230228, -0.032158, 0.47373, 0.467167, 0.52929, -0.435956, -0.008174, -0.358098, 0.658414, 0.378351, -0.710465, -0.415881, -0.805342, -0.475245, -0.658201, -0.387944, 0.215591, -0.092659, 0.389003, -0.872504, 0.43005, 0.423282, 1.053913, -0.144121, 0.09983, 0.530485, 0.817919, 0.10237, 0.211079, 1.78E-4, 0.092287, 0.450624, ...

依次可以得到各个vector,并计算余弦相似度。breeze库支持点积(内积),对应元素操作等,因此计算方便,但是,这种对量量向量计算相似度的话,需要计算六次,有点麻烦。我们直接对矩阵进行操作,如下:

尝试2:

  • 设$A_i$为breeze矩阵第$i$个行向量,则矩阵表示为:

$$
mat = \begin{bmatrix}
A_1 \
A_2 \
A_3 \
A_4 \
\end{bmatrix}
$$

  • 假设我们计算$A_1,A_2$的余弦相似度:

$$
cos(A_1,A_2) = \frac{A_1\cdot A_2}{|A_1|\cdot |A_2|} \=\frac{A_1\cdot A_2}{\sqrt{\sum_i^{300}a_{1i}^2}\cdot \sqrt{\sum_i^{300}a_{2i}^2}}
$$

  • 所以我们可以通过直接操作矩阵来计算:

$$
mat \cdot mat^T = \begin{bmatrix}
A_1 \
A_2 \
A_3 \
A_4 \
\end{bmatrix}\cdot \begin{bmatrix}
A_1^T&
A_2 ^T&
A_3 ^T&
A_4 ^T&
\end{bmatrix} \
= \begin{bmatrix}
A_1 \cdot A_1^T & A_1 \cdot A_2^T & A_1 \cdot A_3^T & A_1 \cdot A_4^T \
…… & A_2 \cdot A_2^T & A_2 \cdot A_3^T & A_2 \cdot A_4^T \
…… &…… & A_3 \cdot A_3^T & A_3 \cdot A_4^T \
…… & …… & …… & A_4 \cdot A_4^T \
\end{bmatrix}
$$

代码:

1
2
3
4
5
6
mat*mat_t
//res70: breeze.linalg.DenseMatrix[Double] =
//69.13677014067899 24.334118226385 32.43453861723301 30.920464306442017
//24.334118226385 75.83683037460902 29.086041093175016 28.565852270159983
//32.43453861723301 29.086041093175016 62.27936389297101 39.45101628343302
//30.920464306442017 28.565852270159983 39.45101628343302 70.68849670204202
  • 对$mat$的每一行求模:

$$
|mat| = \begin{bmatrix}
|A_1| \
|A_2| \
|A_3| \
|A_4 |\
\end{bmatrix} = \begin{bmatrix}
\sqrt{\sum_i^{300} a_{1i}^2} \
\sqrt{\sum_i^{300} a_{2i}^2}\
\sqrt{\sum_i^{300} a_{3i}^2} \
\sqrt{\sum_i^{300} a_{4i}^2} \
\end{bmatrix}
$$

代码:

1
2
3
4
5
6
7
val A1_2 = sum(mat :*mat,Axis._1)
//对每个元素求平方,再按行求和
//warning: there was one deprecation warning; re-run with -deprecation for details
//A1_2: breeze.linalg.DenseVector[Double] = DenseVector(69.13677014067899, 75.83683037460902, 62.27936389297101, 70.68849670204202)

val A1_abs = sqrt(A1_2)
//A1_abs: breeze.linalg.DenseVector[Double] = DenseVector(8.314852382374506, 8.708434438784565, 7.891727560741755, 8.40764513416462)
  • 最后求的相似度矩阵为,:/表示对应元素相除:

$$
cosmat = mat\cdot mat^T :/ (|mat|\cdot |mat|^T) \
=\begin{bmatrix}
A_1 \
A_2 \
A_3 \
A_4 \
\end{bmatrix}\cdot \begin{bmatrix}
A_1^T&
A_2 ^T&
A_3 ^T&
A_4 ^T
\end{bmatrix} \
:/ \
\begin{bmatrix}
|A_1| \
|A_2| \
|A_3|\
|A_4 |\
\end{bmatrix}\cdot \begin{bmatrix}
|A_1|&
|A_2| &
|A_3|&
|A_4 |
\end{bmatrix} \
= \begin{bmatrix}
\frac{A_1 \cdot A_1^T}{|A_1||A_1|} & \frac{A_1 \cdot A_2^T}{|A_1||A_2|} & \frac{A_1 \cdot A_3^T}{|A_1||A_3|} & \frac{A_1 \cdot A_4^T}{|A_1||A_4|} \
…… & \frac{A_2 \cdot A_2^T}{|A_2||A_2|} & \frac{A_2 \cdot A_3^T}{|A_2||A_3|} & \frac{A_2\cdot A_4^T}{|A_2||A_4|} \
…… &…… & \frac{A_3 \cdot A_3^T}{|A_3||A_3|}& \frac{A_3 \cdot A_4^T}{|A_3||A_4|}\
…… & …… & …… & \frac{A_4 \cdot A_4^T}{|A_4||A_4|}\
\end{bmatrix} \
=\begin{bmatrix}
cos(A_1,A_1) & cos(A_1,A_2) & cos(A_1,A_3) & cos(A_1,A_4) \
…… &…… & …… & …… \
…… &…… & …… & …… \
…… & …… & …… & cos(A_4,A_4)\
\end{bmatrix}
$$

代码:

1
2
3
4
5
6
7
8
9
10
val mo = A1_abs*A1_abs.t
//mo: breeze.linalg.DenseMatrix[Double] =
69.13677014067899 72.40934684008003 65.61854970948413 69.9083281739681
72.40934684008003 75.83683037460904 68.7245920714688 73.21742643543864
65.61854970948413 68.7245920714688 62.279363892971006 66.35084482622324
69.9083281739681 73.21742643543864 66.35084482622324 70.688496702042

//最后结果
mat*mat_t :/mo
//res75: breeze.linalg.DenseMatrix[Double] =

最后的相似度矩阵为:

1
2
3
4
1.0                 0.3360632195747908   0.4942891722056013  0.4423001538457035
0.3360632195747908 0.9999999999999998 0.4232261002426548 0.39015100176115397
0.4942891722056013 0.4232261002426548 1.0000000000000002 0.5945819738506353
0.4423001538457035 0.39015100176115397 0.5945819738506353 1.0000000000000002

Method1:map-reduce方法

这是种很笨的方法

step1.创建数组

1
2
val array = rows.map(x => x._2).collect
//array: Array[Array[Double]] = Array(Array(0.729903, 1.011596, 0.11623, 0.671848, -0.413983, 0.38072, 0.055408, 0.29358, -0.45...

step2.组成行向量

1
2
3
4
5
val vector1 = array(0).toVector
//vector1: Vector[Double] = Vector(0.729903, -0.456365, 0.9...
val vector2 = array(1).toVector
....
val vector4 = array(3).toVector

step3.计算相似度

以vector1和vector2计算余弦距离为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
vector1.zip(vector2)
res30: scala.collection.immutable.Vector[(Double, Double)] = Vector((0.729903,0.5044), (1.011596,0.560648), (0.11623,0.111501), (0.671848,0.105221), (-0.413983,-0.223379), (0.38072,-0.046827), (0.055408,0.606104), (0.293588,0.341248), (0.148698,0.483594), (0.231511,0.514198), (-0.084237,-0.207319), (-0.101489,-0.655622), (0.656804,0.940628), (0.081559,0.88866), (-0.40669,-0.121981), (0.019526,-0.05621), (-0.078754,-0.076134), (-0.536043,0.358388), (-0.367921,-0.052004), (0.852284,0.622279), (0.306367,-0.386357), (-0.322883,0.583146), (-0.758667,0.073162), (0.096341,-0.225592), (-1.008443,0.384841), (0.047269,-0.013371), (0.078617,0.430729), (0.528206,0.547542), (0.586177,0.740262), (0.331057,-0.135318), (-0.585402,0.035509), (-0.626427,-0.284685), (-0.337651,-0.876266), (0.222883,0.0095...

//计算分子
val member = vector1.zip(vector2).map(d => d._1*d._2).reduce(_ + _).toDouble
//member: Double = 24.334118226385

//计算第一项分母
val temp = math.sqrt(vector1.map(num => math.pow(num,2)).reduce(_ + _))
temp: Double = 8.314852382374506

//计算第二项分母
val temp2 = math.sqrt(vector2.map(num => math.pow(num,2)).reduce(_ + _))
temp2: Double = 8.708434438784565

//相似度
val res = member/(temp*temp2)
res: Double = 0.3360632195747908

和之前求出的相似度矩阵的第一行第二列相同,为春和夏的相似度。

Method2:RowMatrix方法

spark自带org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix

这个类有个方法叫 columnSimilarities 就是计算矩阵列向量间的两两余弦相似度

返回类型是CoordinateMatrix,一个矩阵,该矩阵下标为(x,y)的元素就是原矩阵第x列和第y列的相似度。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, RowMatrix}
rows
//res49: org.apache.spark.rdd.RDD[(String, Array[Double])] = MapPartitionsRDD[3] at filter at <console>:25

val rows_2 = rows.map(x => Vectors.dense(x._2))//rows是
//rows_2: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[31] at map at <console>:31

val mat = new RowMatrix(rows_2)
//mat: org.apache.spark.mllib.linalg.distributed.RowMatrix = org.apache.spark.mllib.linalg.distributed.RowMatrix@13e3903e

// Compute similar columns perfectly, with brute force.
val exact = mat.columnSimilarities()
//exact: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix = org.apache.spark.mllib.linalg.distributed.CoordinateMatrix@440f4070

他计算的是列与列之间的相似度,生成的是一个坐标矩阵,无法直接访问:

1
2
3
4
5
6
7
8
9
10
11
12
//报错
exact(0,1)
//<console>:32: error: org.apache.spark.mllib.linalg.distributed.CoordinateMatrix does not take parameters

//打印行列数可以
exact.numCols()
//res51: Long = 300
exact.numRows()
//res52: Long = 300

//想要打印所有元素,只能使用下面方法:
exact.entries.foreach(println)

可以看出列相似度矩阵不是我们想要的,出来$300\times300$大小。

而且我也不知道怎么使用RDD矩阵的转置,而且RowMatrix没有专转置方法所有就不搞了。。。。。

几种Vector创建方式

1.mllib

MLLib提供了一序列基本数据类型以支持底层的机器学习算法。主要的数据类型包括:本地向量、标注点(Labeled Point)、本地矩阵、分布式矩阵等。单机模式存储的本地向量与矩阵,以及基于一个或多个RDD的分布式矩阵。其中本地向量与本地矩阵作为公共接口提供简单数据模型,底层的线性代数操作由Breeze库和jblas库提供。标注点类型用来表示监督学习(Supervised Learning)中的一个训练样本。

1
2
3
import org.apache.spark.mllib.linalg.{Vector, Vectors}
val dv: Vector = Vectors.dense(2.0, 0.0, 8.0)
//dv: org.apache.spark.mllib.linalg.Vector = [2.0,0.0,8.0]

无法进行zip操作。

2.toVector方法

EEE,这是RDD[vector]自带的方法

1
2
scala> val rows_2 = rows.map(x => x._2.toVector)
//rows_2: org.apache.spark.rdd.RDD[Vector[Double]] = MapPartitionsRDD[39] at map at <console>:31

3.Breeze库的DenseVector方法

1
2
val dm = rows.map(x => DenseVector(x._2))
//dm: org.apache.spark.rdd.RDD[breeze.linalg.DenseVector[Double]] = MapPartitionsRDD[40] at map at <console>:31

Reference

  1. 怎样使用Spark计算一个集合各个元素(向量表示的)的两两之间的余弦相似度
  2. RowMatrix 列相似度计算
  3. breeze向量矩阵的使用
  4. scala如何从文件读取数据并转换成矩阵
  5. Spark 相似度算法,map,zip,reduce
  6. 分布式矩阵之坐标矩阵使用方法
  7. 子雨大数据之spark教程
  8. 使用inverted index方法实现大型稀疏向量相似度计算
  9. Spark 协同过滤ALS之Item2Item相似度计算优化
  10. Spark Mllib里相似度度量(基于余弦相似度计算不同用户之间相似性)
  11. 【干货】推荐系统中的机器学习算法与评估实战
  12. Spark成长之路(6)-Correlation
  13. 【入门篇】如何在spark分布式矩阵实现协同过滤推荐?
  14. Data Types - RDD-based API