python - 遍历 Spark RDD

标签 python vector apache-spark pyspark

从 Spark DataFrame 开始创建向量矩阵以进行进一步的分析处理。

feature_matrix_vectors = feature_matrix1.map(lambda x: Vectors.dense(x)).cache()
feature_matrix_vectors.first()

输出是一个向量数组。其中一些向量中有一个空值

>>> DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0])
...
>>> DenseVector([1.0, 1231.0, 15.0, 2008.0, null])

据此我想遍历向量矩阵并创建一个 LabeledPoint 数组,如果向量包含 null,则为 0(零),否则为 1。

def f(row):
    if row.contain(None):
       LabeledPoint(1.0,row)
    else:
       LabeledPoint(0.0,row)

我尝试使用

遍历向量矩阵
feature_matrix_labeledPoint = (f(row) for row in feature_matrix_vectors) #   create a generator of row sums
next(feature_matrix_labeledPoint) # Run the iteration protocol

但这行不通。

TypeError: 'PipelinedRDD' object is not iterable

任何帮助都会很棒

最佳答案

RDD 并不是 Python 列表的替代品。您必须使用给定 RDD 上可用的操作或转换。在这里你可以简单地使用map:

from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint


feature_matrix_vectors = sc.parallelize([
    DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]),
    DenseVector([1.0, 1231.0, 15.0, 2008.0, None])
])

(feature_matrix_vectors
    .map(lambda v: LabeledPoint(1.0 if None in v else 0.0, v))
    .collect())

关于python - 遍历 Spark RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31115347/

相关文章:

python - 如果不同数据帧的项目相等但尊重 Pandas 的条件,则打印值

python - 如何使用积分图像检测 ROI 内的强度变化?

python - 如何在 Python 中编写一个空的缩进 block ?

vector - 支持矢量扩展的 RISC-V 仿真器

hadoop - Oozie 无法检测 XML 中的 Spark workflow-app 标签

Python-opencv : Read image data from stdin

math - 合并二维线段

c++ - vector 分配/插入和转换值分配/插入

java - 如何配置连接到 AWS EMR spark 集群的 Java 客户端

python - 无法将 Spark 添加到 PYTHONPATH