我正在寻找一种通过键组合两个 RDD 的方法。
给定:
x = sc.parallelize([('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'),
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'),
]
)
y = sc.parallelize([('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'),
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'),
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d'),
]
)
我找到了解决办法!然而,这个解决方案并不完全满足我想要做的事情。 我创建了一个函数来指定我的 key ,该 key 将应用于名为“x”的rdd:
def get_keys(rdd):
new_x = rdd.map(lambda item: (item[0], (item[1], item[2])))
return new_x
new_x = get_keys(x)
给出:
[('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001')),
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160'))]
然后:
new_x.union(y).map(lambda (x, y): (x, [y])).reduceByKey(lambda p, q : p + q).collect()
结果:
[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', ['JmJCFu3N']),
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', [('FR', '75001'), 'KlGZj08d']),
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', [('TN', '8160')]),
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', ['KNPQLQth'])]
我想要的是:
[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, None, 'JmJCFu3N')),
('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001', 'KlGZj08d')),
('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160', None)),
('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, None, 'KNPQLQth'))]
帮忙?
最佳答案
为什么不呢?
>>> new_x.fullOuterJoin(y)
或
>>> x.toDF().join(y.toDF(), ["_1"], "fullouter").rdd
关于apache-spark - 如何使用 PySpark 正确执行两个 RDD 的完整外连接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40025845/