python - pyspark - 使用 OR 条件连接

标签 python dataframe apache-spark join pyspark

如果至少满足两个条件之一,我想加入两个 pyspark 数据帧。

玩具数据:

df1 = spark.createDataFrame([
    (10, 1, 666),
    (20, 2, 777),
    (30, 1, 888),
    (40, 3, 999),
    (50, 1, 111),
    (60, 2, 222),
    (10, 4, 333),
    (50, None, 444),
    (10, 0, 555),
    (50, 0, 666)
    ],
    ['var1', 'var2', 'other_var'] 
)

df2 = spark.createDataFrame([
    (10, 1),
    (20, 2),
    (30, None),
    (30, 0)
    ],
    ['var1_', 'var2_'] 
)

我想维护 df1 的所有行,其中 var1 出现在 df2.var1_ 的不同值中 OR var2 出现在 df2.var2_ 的不同值中(但在该值为 0 的情况下则不存在)。

所以,预期的输出是

+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
|  10|   1|      666|   10|    1|   # join on both var1 and var2
|  20|   2|      777|   20|    2|   # join on both var1 and var2
|  30|   1|      888|   10|    1|   # join on both var1 and var2
|  50|   1|      111|   10|    1|   # join on var2
|  60|   2|      222|   20|    2|   # join on var2
|  10|   4|      333|   10|    1|   # join on var1
|  10|   0|      555|   10|    1|   # join on var1
+----+----+---------+-----+-----+

在其他尝试中,我尝试过

cond = [(df1.var1 == (df2.select('var1_').distinct()).var1_) | (df1.var2 == (df2.filter(F.col('var2_') != 0).select('var2_').distinct()).var2_)]
df1\
    .join(df2, how='inner', on=cond)\
    .show()

+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
|  10|   1|      666|   10|    1|
|  20|   2|      777|   20|    2|
|  30|   1|      888|   10|    1|
|  50|   1|      111|   10|    1|
|  30|   1|      888|   30| null|
|  30|   1|      888|   30|    0|
|  60|   2|      222|   20|    2|
|  10|   4|      333|   10|    1|
|  10|   0|      555|   10|    1|
|  10|   0|      555|   30|    0|
|  50|   0|      666|   30|    0|
+----+----+---------+-----+-----+

但我获得的行数比预期多,并且 var2 == 0 的行也被保留。

我做错了什么?

注意:我没有使用 .isin 方法,因为我实际的 df2 有大约 20k 行,并且我已阅读 here这种具有大量 ID 的方法可能会产生较差的性能。

最佳答案

尝试以下条件:

cond = (df2.var2_ != 0) & ((df1.var1 == df2.var1_) | (df1.var2 == df2.var2_))
df1\
    .join(df2, how='inner', on=cond)\
    .show()

+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
|  10|   1|      666|   10|    1|
|  30|   1|      888|   10|    1|
|  20|   2|      777|   20|    2|
|  50|   1|      111|   10|    1|
|  60|   2|      222|   20|    2|
|  10|   4|      333|   10|    1|
|  10|   0|      555|   10|    1|
+----+----+---------+-----+-----+

条件应仅包含要连接的两个数据帧中的列。如果您想删除 var2_ = 0,您可以将它们作为连接条件,而不是作为过滤器。

也不需要指定distinct,因为它不影响相等条件,而且还增加了不必要的步骤。

关于python - pyspark - 使用 OR 条件连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66687697/

相关文章:

python - QListView 不接受使用 Python PyQt4 的 QMainWindow 中的放置

python - 如何遍历 Pandas 数据框+索引中除最后一列以外的所有列?

python - TypeError : “' int' object is not callable”, 'occurred at index 0'

apache-spark - 如何检索输出大小和从 Spark UI 写入的记录等指标?

python - 在 Python 中,如何自然地对字母数字字符串列表进行排序,使字母字符排在数字字符之前?

python - cx_freeze 和捆绑文件

python - 如果变量不存在,如何检查实例是否存在?

r - 如何从列表中向 R 数据框添加多列

apache-spark - 我确实通过hiveql更改了表。然后用spark-sql显示表是行不通的。错误:路径不存在

java - Apache Spark - JavaSparkContext 无法转换为 SparkContext 错误