如果至少满足两个条件之一,我想加入两个 pyspark 数据帧。
玩具数据:
df1 = spark.createDataFrame([
(10, 1, 666),
(20, 2, 777),
(30, 1, 888),
(40, 3, 999),
(50, 1, 111),
(60, 2, 222),
(10, 4, 333),
(50, None, 444),
(10, 0, 555),
(50, 0, 666)
],
['var1', 'var2', 'other_var']
)
df2 = spark.createDataFrame([
(10, 1),
(20, 2),
(30, None),
(30, 0)
],
['var1_', 'var2_']
)
我想维护 df1
的所有行,其中 var1
出现在 df2.var1_
的不同值中 OR var2
出现在 df2.var2_
的不同值中(但在该值为 0 的情况下则不存在)。
所以,预期的输出是
+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
| 10| 1| 666| 10| 1| # join on both var1 and var2
| 20| 2| 777| 20| 2| # join on both var1 and var2
| 30| 1| 888| 10| 1| # join on both var1 and var2
| 50| 1| 111| 10| 1| # join on var2
| 60| 2| 222| 20| 2| # join on var2
| 10| 4| 333| 10| 1| # join on var1
| 10| 0| 555| 10| 1| # join on var1
+----+----+---------+-----+-----+
在其他尝试中,我尝试过
cond = [(df1.var1 == (df2.select('var1_').distinct()).var1_) | (df1.var2 == (df2.filter(F.col('var2_') != 0).select('var2_').distinct()).var2_)]
df1\
.join(df2, how='inner', on=cond)\
.show()
+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
| 10| 1| 666| 10| 1|
| 20| 2| 777| 20| 2|
| 30| 1| 888| 10| 1|
| 50| 1| 111| 10| 1|
| 30| 1| 888| 30| null|
| 30| 1| 888| 30| 0|
| 60| 2| 222| 20| 2|
| 10| 4| 333| 10| 1|
| 10| 0| 555| 10| 1|
| 10| 0| 555| 30| 0|
| 50| 0| 666| 30| 0|
+----+----+---------+-----+-----+
但我获得的行数比预期多,并且 var2 == 0
的行也被保留。
我做错了什么?
注意:我没有使用 .isin
方法,因为我实际的 df2
有大约 20k 行,并且我已阅读 here这种具有大量 ID 的方法可能会产生较差的性能。
最佳答案
尝试以下条件:
cond = (df2.var2_ != 0) & ((df1.var1 == df2.var1_) | (df1.var2 == df2.var2_))
df1\
.join(df2, how='inner', on=cond)\
.show()
+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
| 10| 1| 666| 10| 1|
| 30| 1| 888| 10| 1|
| 20| 2| 777| 20| 2|
| 50| 1| 111| 10| 1|
| 60| 2| 222| 20| 2|
| 10| 4| 333| 10| 1|
| 10| 0| 555| 10| 1|
+----+----+---------+-----+-----+
条件应仅包含要连接的两个数据帧中的列。如果您想删除 var2_ = 0
,您可以将它们作为连接条件,而不是作为过滤器。
也不需要指定distinct
,因为它不影响相等条件,而且还增加了不必要的步骤。
关于python - pyspark - 使用 OR 条件连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66687697/