python - PySpark:字典类型 RDD 的迭代

标签 python apache-spark pyspark rdd

我有一个字典类型的 RDD:

>>> a.collect()

[{(1155718, 105): 14, (1155718, 1887): 2, (1155718, 1930): 12, (1155718, 927): 6, (1155718, 2783): 8, (1155718, 738): 4, (1155718, 952): 4, (1155718, 1196): 6, (1155718, 997): 4, (1155718, 2904): 38}]

只是为了检查:

>>> a.map(lambda x:type(x)).collect()

[< type 'dict' >]

但是我无法使用 map() 迭代 dict 类型 RDD。我尝试过:

>>> a.map(lambda x:(k,v) for k,v in x.iteritems())

令我惊讶的是,它会导致错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'x' is not defined

我是否遗漏了任何重要的一点。

编辑:代码没问题,除非与生成器语法相关的小错误,正确的代码应该是:

a.map(lambda x:[(k,v) for k,v in x.iteritems()])

最佳答案

我尝试过这个:

data = [{(1155718, 105): 14, (1155718, 1887): 2, (1155718, 1930): 12, (1155718, 927): 6, (1155718, 2783): 8, (1155718, 738): 4, 
         (1155718, 952): 4, (1155718, 1196): 6, (1155718, 997): 4, (1155718, 2904): 38}]

rdd = sc.parallelize(data) 
rdd.flatMap(lambda _: [(k,v) for (k,v) in _.items()]).collect()

得到了这个:

[((1155718, 105), 14),
 ((1155718, 738), 4),
 ((1155718, 2904), 38),
 ((1155718, 1887), 2),
 ((1155718, 1196), 6),
 ((1155718, 1930), 12),
 ((1155718, 927), 6),
 ((1155718, 2783), 8),
 ((1155718, 997), 4),
 ((1155718, 952), 4)]

关于python - PySpark:字典类型 RDD 的迭代,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43613869/

相关文章:

python - 如何使用计算数据创建历史数据框?

maven - 创建用于提交Spark应用程序的瘦 jar

python - 在 python 中初始化 MSEdge 浏览器,得到 TypeError : Level not an integer or a valid string: None

python - 如何通过将元组更改为...来完成元组上的 fit_transform,或者能够完成数据上的 fit_transform?

apache-spark - 为什么 spark 没有在多个节点上重新分配我的数据帧?

apache-spark - 如何为Spark Streaming定义Kafka(数据源)依赖项?

python - 在所有列聚合后重命名所有列

pyspark - 使用时间戳自行加入 pyspark 数据框

java - PySpark:无法创建 SparkSession。(Java 网关错误)

python - 如何在枕头中使用 alpha_composite?