scala - 添加两个 RDD[mllib.linalg.Vector]

标签 scala apache-spark apache-spark-mllib

我需要添加两个存储在两个文件中的矩阵。
latest1.txt的内容和 latest2.txt有下一个str:

1 2 3
4 5 6
7 8 9

I am reading those files as follows:

scala> val rows = sc.textFile(“latest1.txt”).map { line => val values = line.split(‘ ‘).map(_.toDouble)
    Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}

scala> val r1 = rows
r1: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14

scala> val rows = sc.textFile(“latest2.txt”).map { line => val values = line.split(‘ ‘).map(_.toDouble)
    Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}

scala> val r2 = rows
r2: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14

我想添加r1,r2。那么,有什么办法可以添加这两个RDD[mllib.linalg.Vector] s 在 Apache-Spark 中。

最佳答案

这实际上是一个很好的问题。我经常使用 mllib,但没有意识到这些基本的线性代数运算不容易访问。

关键是底层的微风 向量具有您期望的所有线性代数操作 - 当然包括您特别提到的基本元素明智加法。

然而,微风实现通过以下方式对外界隐藏:

[private mllib]

那么,从外部世界/公共(public) API 的角度来看,我们如何访问这些原语?

其中一些已经暴露:例如平方和:
/**
 * Returns the squared distance between two Vectors.
 * @param v1 first Vector.
 * @param v2 second Vector.
 * @return squared distance between two Vectors.
 */
def sqdist(v1: Vector, v2: Vector): Double = { 
  ...
}

然而,这些可用方法的选择是有限的——实际上不包括基本操作,包括元素加法、减法、乘法等。

所以这是我能看到的最好的:
  • 将向量转换为微风:
  • 在微风中执行向量操作
  • 从微风转换回 mllib 向量

  • 这是一些示例代码:
    val v1 = Vectors.dense(1.0, 2.0, 3.0)
    val v2 = Vectors.dense(4.0, 5.0, 6.0)
    val bv1 = new DenseVector(v1.toArray)
    val bv2 = new DenseVector(v2.toArray)
    
    val vectout = Vectors.dense((bv1 + bv2).toArray)
    vectout: org.apache.spark.mllib.linalg.Vector = [5.0,7.0,9.0]
    

    关于scala - 添加两个 RDD[mllib.linalg.Vector],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28232829/

    相关文章:

    apache-spark - JAVA 中的 Spark IntArrayParm

    scala - 在 sbt 运行任务中重定向标准输入/标准输出

    scala - 当 Scala "Future"被垃圾收集时会发生什么?

    scala - Spark程序版本冲突的最佳解决方案

    scala - 如何从 spark 中的输出控制台抑制 "Stage 2===>"?

    java - Scala如何将 "None"排序到底部(如果存在)并选择每组中的第一行?

    apache-spark - 内存不足错误: Java heap space in Spark

    apache-spark - Google Cloud Dataproc 删除 BigQuery 表不起作用

    apache-spark - 列变换后的 Pyspark 随机森林特征重要性映射

    scala - pyspark 与 scala 中的 FPgrowth 计算关联