我有一个字典类型的 RDD:
>>> a.collect()
[{(1155718, 105): 14, (1155718, 1887): 2, (1155718, 1930): 12, (1155718, 927): 6, (1155718, 2783): 8, (1155718, 738): 4, (1155718, 952): 4, (1155718, 1196): 6, (1155718, 997): 4, (1155718, 2904): 38}]
只是为了检查:
>>> a.map(lambda x:type(x)).collect()
[< type 'dict' >]
但是我无法使用 map()
迭代 dict 类型 RDD。我尝试过:
>>> a.map(lambda x:(k,v) for k,v in x.iteritems())
令我惊讶的是,它会导致错误:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
NameError: name 'x' is not defined
我是否遗漏了任何重要的一点。
编辑:代码没问题,除非与生成器语法相关的小错误,正确的代码应该是:
a.map(lambda x:[(k,v) for k,v in x.iteritems()])
最佳答案
我尝试过这个:
data = [{(1155718, 105): 14, (1155718, 1887): 2, (1155718, 1930): 12, (1155718, 927): 6, (1155718, 2783): 8, (1155718, 738): 4,
(1155718, 952): 4, (1155718, 1196): 6, (1155718, 997): 4, (1155718, 2904): 38}]
rdd = sc.parallelize(data)
rdd.flatMap(lambda _: [(k,v) for (k,v) in _.items()]).collect()
得到了这个:
[((1155718, 105), 14),
((1155718, 738), 4),
((1155718, 2904), 38),
((1155718, 1887), 2),
((1155718, 1196), 6),
((1155718, 1930), 12),
((1155718, 927), 6),
((1155718, 2783), 8),
((1155718, 997), 4),
((1155718, 952), 4)]
关于python - PySpark:字典类型 RDD 的迭代,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43613869/