pyspark - PySpark 中的矩阵乘法 A^T * A

标签 pyspark matrix-multiplication

我昨天问了一个类似的问题-Matrix Multiplication between two RDD[Array[Double]] in Spark - 但是我决定转移到 pyspark 来做到这一点。我在加载和重新格式化数据方面取得了一些进展 - Pyspark map from RDD of strings to RDD of list of doubles - 但是矩阵乘法很困难。先分享一下我的进展:

matrix1.txt
1.2 3.4 2.3 
2.3 1.1 1.5
3.3 1.8 4.5
5.3 2.2 4.5
9.3 8.1 0.3
4.5 4.3 2.1 

共享文件很困难,但这就是我的 matrix1.txt 文件的样子。它是一个以空格分隔的文本文件,包括矩阵的值。接下来是代码:
# do the imports for pyspark and numpy
from pyspark import SparkConf, SparkContext
import numpy as np

# loadmatrix is a helper function used to read matrix1.txt and format
# from RDD of strings to RDD of list of floats
def loadmatrix(sc):
    data = sc.textFile("matrix1.txt").map(lambda line: line.split(' ')).map(lambda line: [float(x) for x in line])
    return(data) 

# this is the function I am struggling with, it should take a line of the 
# matrix (formatted as list of floats), compute an outer product with itself
def AtransposeA(line):
    # pseudocode for this would be...
    # outerprod = compute line * line^transpose     
    # return(outerprod)

# here is the main body of my file    
if __name__ == "__main__":
    # create the conf, sc objects, then use loadmatrix to read data
    conf = SparkConf().setAppName('SVD').setMaster('local')
    sc = SparkContext(conf = conf)
    mymatrix = loadmatrix(sc)

    # this is pseudocode for calling AtransposeA
    ATA = mymatrix.map(lambda line: AtransposeA(line)).reduce(elementwise add all the outerproducts)

    # the SVD of ATA is computed below
    U, S, V = np.linalg.svd(ATA)

    # ...

我的方法如下 - 要进行矩阵乘法 A^T * A,我创建了一个计算 A 行的外积的函数。所有外积的元素总和就是我想要的乘积。然后我在 map 函数中调用 AtransposeA(),这样它就在矩阵的每一行上执行,最后我使用 reduce() 来添加结果矩阵。

我正在努力思考 AtransposeA 函数的外观。我怎样才能在 pyspark 中做这样的外积?预先感谢您的帮助!

最佳答案

首先,考虑为什么要使用 Spark为了这。听起来您的所有数据都适合内存,在这种情况下,您可以使用 numpypandas以一种非常直接的方式。

如果您的数据没有结构化以便行是独立的,那么它可能无法通过将行组发送到不同节点来并行化,这就是使用 Spark 的全部意义所在。 .

话虽如此……这里有一些 pyspark (2.1.1) 我认为可以满足您的需求的代码。

# read the matrix file
df = spark.read.csv("matrix1.txt",sep=" ",inferSchema=True)
df.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|1.2|3.4|2.3|
|2.3|1.1|1.5|
|3.3|1.8|4.5|
|5.3|2.2|4.5|
|9.3|8.1|0.3|
|4.5|4.3|2.1|
+---+---+---+
# do the sum of the multiplication that we want, and get
# one data frame for each column
colDFs = []
for c2 in df.columns:
    colDFs.append( df.select( [ F.sum(df[c1]*df[c2]).alias("op_{0}".format(i)) for i,c1 in enumerate(df.columns) ] ) )
# now union those separate data frames to build the "matrix"
mtxDF = reduce(lambda a,b: a.select(a.columns).union(b.select(a.columns)), colDFs )
mtxDF.show()
+------------------+------------------+------------------+
|              op_0|              op_1|              op_2|
+------------------+------------------+------------------+
|            152.45|118.88999999999999|             57.15|
|118.88999999999999|104.94999999999999|             38.93|
|             57.15|             38.93|52.540000000000006|
+------------------+------------------+------------------+

这似乎与您从 numpy 得到的结果相同.
a = numpy.genfromtxt("matrix1.txt")
numpy.dot(a.T, a)
array([[ 152.45,  118.89,   57.15],
       [ 118.89,  104.95,   38.93],
       [  57.15,   38.93,   52.54]])

关于pyspark - PySpark 中的矩阵乘法 A^T * A,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44348527/

相关文章:

python - Pyspark - 在 groupby 和 orderBy 之后选择列中的不同值

apache-spark - 如何刷新 HDFS 路径?

algorithm - 高阶矩阵乘法

python - 将两个矩阵划分为8个小矩阵的更快方法

python - 在python中的kafka Direct Stream中手动提交偏移量

apache-spark - 如何在Spark中计算过去时间值 `percent_rank`?

apache-spark - 如何在 Spark Structured Streaming 中指定批处理间隔?

matrix - 在 Julia 中将矩阵提升为幂

java - java中的并行矩阵乘法

C:将指向结构的指针传递给函数时出现奇怪的行为