scala - 计算余弦相似度 Spark Dataframe

标签 scala apache-spark apache-spark-sql apache-spark-mllib

我正在使用 Spark Scala 来计算 Dataframe 行之间的余弦相似度。

数据帧格式如下

root
    |-- SKU: double (nullable = true)
    |-- Features: vector (nullable = true)

下面的数据框示例

    +-------+--------------------+
    |    SKU|            Features|
    +-------+--------------------+
    | 9970.0|[4.7143,0.0,5.785...|
    |19676.0|[5.5,0.0,6.4286,4...|
    | 3296.0|[4.7143,1.4286,6....|
    |13658.0|[6.2857,0.7143,4....|
    |    1.0|[4.2308,0.7692,5....|
    |  513.0|[3.0,0.0,4.9091,5...|
    | 3753.0|[5.9231,0.0,4.846...|
    |14967.0|[4.5833,0.8333,5....|
    | 2803.0|[4.2308,0.0,4.846...|
    |11879.0|[3.1429,0.0,4.5,4...|
    +-------+--------------------+

我尝试转置矩阵并检查以下提到的链接。 Apache Spark Python Cosine Similarity over DataFrames , calculating-cosine-similarity-by-featurizing-the-text-into-vector-using-tf-idf但我相信有更好的解决方案

我尝试了下面的示例代码

val irm = new IndexedRowMatrix(inClusters.rdd.map {
  case (v,i:Vector) => IndexedRow(v, i)


}).toCoordinateMatrix.transpose.toRowMatrix.columnSimilarities

但我收到以下错误

Error:(80, 12) constructor cannot be instantiated to expected type;
 found   : (T1, T2)
 required: org.apache.spark.sql.Row
      case (v,i:Vector) => IndexedRow(v, i)

我检查了以下链接 Apache Spark: How to create a matrix from a DataFrame?但无法使用 Scala 做到这一点

最佳答案

  • DataFrame.rdd 返回 RDD[Row] 而不是 RDD[(T, U)]。您必须对Row进行模式匹配或直接提取有趣的部分。
  • ml VectorDatasets 一起使用,因为 Spark 2.0 与 mllib Vector 由旧 API 使用。您必须将其转换为与 IndexedRowMatrix 一起使用。
  • 索引必须是Long而不是字符串。
import org.apache.spark.sql.Row

val irm = new IndexedRowMatrix(inClusters.rdd.map {
  Row(_, v: org.apache.spark.ml.linalg.Vector) => 
    org.apache.spark.mllib.linalg.Vectors.fromML(v)
}.zipWithIndex.map { case (v, i) => IndexedRow(i, v) })

关于scala - 计算余弦相似度 Spark Dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47010126/

相关文章:

apache-spark - 如何访问 mapGroupsWithState 中的 stateSnapshot 或在流之间共享 GroupState?

apache-spark - Zeppelin 抛出 java.lang.OutOfMemoryError : Java heap space

apache-spark - 为什么 Spark-Shell 失败并显示 “error: not found: value spark” ?

scala - 我应该将库发布到哪个版本的 Scala?

python - 如何在 Windows 上安装 Polynote?

scala - 在 SBT 中将参数传递给多个任务

java - Spark Dataframe Write to CSV 在 Standalone Cluster Mode 下创建_temporary 目录文件

java - Scala:如何使用 Java 回调 API

apache-spark - 通过 spark-submit 将 JAR 提交到 Spark 时出现 ClassNotFoundException

apache-spark - 在 Spark 中通过 SFTP 读取文件