python - Apache Spark : How to create a matrix from a DataFrame?

标签 python matrix apache-spark pyspark apache-spark-mllib

我在 Apache Spark 中有一个带有整数数组的 DataFrame,源是一组图像。我最终想对其进行 PCA,但我无法从我的数组创建矩阵。如何从 RDD 创建矩阵?

> imagerdd = traindf.map(lambda row: map(float, row.image))
> mat = DenseMatrix(numRows=206456, numCols=10, values=imagerdd)
Traceback (most recent call last):

  File "<ipython-input-21-6fdaa8cde069>", line 2, in <module>
mat = DenseMatrix(numRows=206456, numCols=10, values=imagerdd)

  File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 815, in __init__
values = self._convert_to_array(values, np.float64)

  File     "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 806, in _convert_to_array
    return np.asarray(array_like, dtype=dtype)

  File "/usr/local/python/conda/lib/python2.7/site-        packages/numpy/core/numeric.py", line 462, in asarray
    return array(a, dtype, copy=False, order=order)

TypeError: float() argument must be a string or a number

我从我能想到的每一种可能的安排中得到同样的错误:

imagerdd = traindf.map(lambda row: Vectors.dense(row.image))
imagerdd = traindf.map(lambda row: row.image)
imagerdd = traindf.map(lambda row: np.array(row.image))

如果我尝试

> imagedf = traindf.select("image")
> mat = DenseMatrix(numRows=206456, numCols=10, values=imagedf)

追溯(最近的调用最后):

  File "<ipython-input-26-a8cbdad10291>", line 2, in <module>
mat = DenseMatrix(numRows=206456, numCols=10, values=imagedf)

  File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 815, in __init__
    values = self._convert_to_array(values, np.float64)

  File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 806, in _convert_to_array
    return np.asarray(array_like, dtype=dtype)

  File "/usr/local/python/conda/lib/python2.7/site-packages/numpy/core/numeric.py", line 462, in asarray
    return array(a, dtype, copy=False, order=order)

ValueError: setting an array element with a sequence.

最佳答案

由于您没有提供示例输入,我假设它看起来或多或少像这样,其中 id 是行号,image 包含值。

traindf = sqlContext.createDataFrame([
    (1, [1, 2, 3]),
    (2, [4, 5, 6]),
    (3, (7, 8, 9))
], ("id", "image"))

首先您必须了解DenseMatrix 是一个本地 数据结构。准确地说,它是 numpy.ndarray 的包装器。至于现在(Spark 1.4.1),PySpark MLlib 中没有分布式等效项。

密集矩阵采用三个强制参数 numRowsnumColsvalues,其中 values 是本地数据结构。在您的情况下,您必须先收集:

values = (traindf.
    rdd.
    map(lambda r: (r.id, r.image)). # Extract row id and data
    sortByKey(). # Sort by row id
    flatMap(lambda (id, image): image).
    collect())


ncol = len(traindf.rdd.map(lambda r: r.image).first())
nrow = traindf.count()

dm = DenseMatrix(nrow, ncol, values)

最后:

> print dm.toArray()
[[ 1.  4.  7.]
 [ 2.  5.  8.]
 [ 3.  6.  9.]]

编辑:

在 Spark 1.5+ 中,您可以按如下方式使用 mllib.linalg.distributed:

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

mat = IndexedRowMatrix(traindf.map(lambda row: IndexedRow(*row)))
mat.numRows()
## 4
mat.numCols()
## 3

尽管就目前而言,API 在实践中的实用性仍然有限。

关于python - Apache Spark : How to create a matrix from a DataFrame?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31567989/

相关文章:

python - 有一种通过按键检查 QCheckBox 的方法吗?

python - 矩阵外的函数输入

c++ - 比较矩阵乘法

apache-spark - 无法在 Spark 结构化流中转换 Kafka Json 数据

apache-spark - 如何将 Spark 数据帧写入 Neo4j 数据库

python - 当您知道列和行引用时如何更改数据框中的字段值

python - 可以在 pytest.ini 中设置测试的任意配置吗?

python - 网格置换算法 - 固定行顺序

python - 在 Python 中使用带有 FUN ="-"的外部函数

apache-spark - 在pyspark中,使用df.write.partitionBy(..).save时如何对某一列的部分值进行分区?