python - 是否可以在 leftOuterJoin 上初始化一个空的默认值?

标签 python apache-spark pyspark

我有以下两个rdd:

name_to_hour = sc.parallelize([("Amy", [7,8,7,18,19]), ("Dan", [6,7]), ("Emily", [1,2,3,7,7,7,2])])

name_biz = sc.parallelize(["Amy", "Brian", "Chris", "Dan", "Emily"])

我想加入他们,所以我得到的 rdd 看起来像这样:

[('Amy', [7, 8, 7, 18, 19]), ('Chris', []), ('Brian', []), ('Dan', [6, 7]), ('Emily', [1, 2, 3, 7, 7, 7, 2])]

我可以通过我认为笨拙的解决方案来实现这一目标:

from pyspark import SparkContext

sc = SparkContext()

name_to_hour = sc.parallelize([("Amy", [7,8,7,18,19]), ("Dan", [6,7]), ("Emily", [1,2,3,7,7,7,2])])

name_biz = sc.parallelize(["Amy", "Brian", "Chris", "Dan", "Emily"])

temp = name_biz.map(lambda x: (x, []))

joined_rdd = temp.leftOuterJoin(name_to_hour)

def concat(my_tup):
    if my_tup[1] is None:
        return []
    else:
        return my_tup[1]

result_rdd = joined_rdd.map(lambda x: (x[0], concat(x[1])))

print "\033[0;34m{}\033[0m".format(result_rdd.collect())

有更好的方法吗?

我在想,如果可以以某种方式在 leftOuterJoin 上指定,那么非空字段将保留它们在 name_to_hour 中的内容,而空字段将获得默认值 [],我的问题可以更容易解决,但我不认为有这样的方法。

最佳答案

解决此问题的一种方法是利用 Python 列表的字典顺序。由于空列表总是“小于”非空列表,我们可以简单地创建一个union并使用max减少:

temp.union(name_to_hour).reduceByKey(max)

这当然假设 key 是唯一的。

关于python - 是否可以在 leftOuterJoin 上初始化一个空的默认值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37421994/

相关文章:

python - 创建带有增量计数器的列,用于识别 Pandas 中的重复集

python - 如何在Python中使用pyqt创建饼图

sql - 如何在 FROM 语句上使用 header 加载 SparkSQL

pyspark - 在Spark中将日期转换为月末

scala - 如何在 Scala Spark 项目中使用 PySpark UDF?

python - 为什么 python 语句 `if something is None` 比 `if not something` 运行得快得多?

python - 根据条件在 Seaborn 中绘制彩色密度图

hadoop - 在Shark Hive中创建连接两个现有表的表

python - 将 Apache Spark Scala 代码转换为 Python

python - Pyspark:将参数传递给数据帧中的字符串列