apache-spark - 如何使用 PySpark 正确执行两个 RDD 的完整外连接?

标签 apache-spark mapreduce pyspark apache-spark-sql outer-join

我正在寻找一种通过键组合两个 RDD 的方法。

给定:

x = sc.parallelize([('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'),
                ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'),
               ]
              )
y = sc.parallelize([('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'),
                ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'),
                ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d'),
               ]
              )

我找到了解决办法!然而,这个解决方案并不完全满足我想要做的事情。 我创建了一个函数来指定我的 key ,该 key 将应用于名为“x”的rdd:

def get_keys(rdd):

    new_x = rdd.map(lambda item: (item[0], (item[1], item[2])))
    return new_x

new_x = get_keys(x)

给出:

[('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001')),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160'))]

然后:

new_x.union(y).map(lambda (x, y): (x, [y])).reduceByKey(lambda p, q : p + q).collect()

结果:

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', ['JmJCFu3N']),
 ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', [('FR', '75001'), 'KlGZj08d']),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', [('TN', '8160')]),
 ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', ['KNPQLQth'])]

我想要的是:

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, None, 'JmJCFu3N')),
 ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001', 'KlGZj08d')),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160', None)),
 ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, None, 'KNPQLQth'))]  

帮忙?

最佳答案

为什么不呢?

>>> new_x.fullOuterJoin(y)

>>> x.toDF().join(y.toDF(), ["_1"], "fullouter").rdd

关于apache-spark - 如何使用 PySpark 正确执行两个 RDD 的完整外连接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40025845/

相关文章:

apache-spark - 在 pyspark 中查找并删除匹配的列值

java - 使用 Apache Spark 迭代并展平数据集中的结构类型数组 :Java

hadoop - Spark 无法再执行作业。执行者创建目录失败

scala - 我可以通过 spark-scala 程序运行 shell 脚本吗?

hadoop - 如何配置 Hive 以使用 Spark?

hadoop - 能否在 Hadoop 集群中的 Map Task 中启动特定进程?

python - PySpark 减少按键?添加键/元组

apache-spark - org.apache.spark.sql.SQLContext 无法加载文件

hadoop - 以编程方式查找正在运行的 Hadoop 作业的失败 TaskAttempts

python - pyspark 1.3.0 将数据框保存到 HIVE 表中