python - 两个 pyspark 数据帧的余弦相似度

标签 python apache-spark pyspark apache-spark-sql

我有一个 PySpark DataFrame df1,看起来像:

CustomerID  CustomerValue CustomerValue2 
12          .17           .08

我有第二个 PySpark DataFrame,df2

 CustomerID  CustomerValue CustomerValue
 15          .17           .14
 16          .40           .43
 18          .86           .09

我想计算两个数据帧的余弦相似度。还有类似的东西

 CustomerID  CustomerID   CosineCustVal CosineCustVal
 15          12           1            .90
 16          12           .45          .67
 18          12           .8           .04

最佳答案

您只能计算两个向量的余弦相似度,而不能计算两个数字的余弦相似度。也就是说,如果名为 CustomerValue 的列是向量的不同组成部分,该向量表示您想要获得两个客户之间相似性的特征,则可以通过转置数据框然后对 CuatomerValues 进行联接来实现。

可以通过爆炸来完成转置(有关转置数据框的更多详细信息 here ):

from pyspark.sql import functions as F

kvs = F.explode(F.array([
        F.struct(F.lit(c).alias('key'), F.columm(c).alias('value')) for c in ['CustomerValue1', 'CustomerValue2']
      ])).alias('kvs')

dft1 = (df1.select(['CustomerID', kvs])
        .select('CustomerID', F.column('kvs.name').alias('column_name'), F.column('kvs.value').alias('column_value'))
        )
dft2 = (df2.select(['CustomerID', kvs])
        .select('CustomerID', F.column('kvs.name').alias('column_name'), F.column('kvs.value').alias('column_value'))
        )

其中dft1dft2表示转置的数据帧。转置它们后,您可以将它们加入到列名称上:

dft2 = (dft2.withColumnRenamed('CustomerID', 'CustomerID2')
        .withColumnRenamed('column_value', 'column_value2')
       )
cosine = (dft1.join(dft2, dft1.column_name = dft2.column_name)
          .groupBy('CustomerID' , 'CustomerID2')
          .agg(F.sum(F.column('column_value')*F.column('column_value2')).alias('cosine_similarity'))
         )

现在,在 cosine 中,您有三列:第一个和第二个数据帧的 CustomerID 以及余弦相似度(前提是首先对值进行归一化)。这样做的优点是您只有具有非零相似度的 CustomerID 对的行(在某些 CustomerID 值为零的情况下)。举个例子:

df1:

CustomerID CustomerValue CustomerValue2
12         .17           .08

df2:

CustomerID CustomerValue CustomerValue
15         .17           .14
16         .40           .43
18         .86           .09

余弦:

CustomID CustomID2 cosine_similarity
12       15        .0401
12       16        .1024
12       18        .1534

当然,这些还不是真正的余弦相似度,您需要首先对值进行标准化。您可以通过分组来做到这一点:

(df.groupBy('CustomerID')
 .agg(F.sqrt(F.sum(F.column('column_value')*F.column('column_value'))).alias('norm'))
 .select('CustomerID', F.column('column_name'), (F.column('column_value')/F.column('norm')).alias('column_value_norm'))
)

对列进行归一化后,您的余弦相似度将变为以下内容:

CustomID CustomID2 cosine_similarity
12       15        .970
12       16        .928
12       18        .945

相似度值较大是由于维度较低(仅两个分量)。

关于python - 两个 pyspark 数据帧的余弦相似度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52542903/

相关文章:

python - 将 DataFrame 拆分为仅包含给定常量值的组

python - 使用 matplotlib.collection.LineCollection 时发生内存泄漏

apache-spark - 分区如何映射到 Spark 中的任务?

apache-spark - Spark如何计算hash shuffle中的reducer数量?

pyspark - Dataproc 主节点和工作器节点之间的 Python 版本不同

hadoop - 由于空间问题导致 Spark 作业失败

python - 如何通过计算解决 'four points, two distance, unique shapes' 问题

python - pygame.display.update 更新整个屏幕

斯卡拉 Spark : how to use dataset for a case class with the schema has snake_case?

apache-spark - 如何缓存 Spark 数据帧并在另一个脚本中引用它