python - 使用条件结果列连接 PySpark 数据框

标签 python apache-spark apache-spark-sql pyspark

我有这些表:

df1                  df2
+---+------------+   +---+---------+
| id|   many_cols|   | id|criterion|
+---+------------+   +---+---------+
|  1|lots_of_data|   |  1|    false|
|  2|lots_of_data|   |  1|     true|
|  3|lots_of_data|   |  1|     true|
+---+------------+   |  3|    false|
                     +---+---------+

我打算在 df1 中创建额外的列:

+---+------------+------+
| id|   many_cols|result|
+---+------------+------+
|  1|lots_of_data|     1|
|  2|lots_of_data|  null|
|  3|lots_of_data|     0|
+---+------------+------+
如果df2
中有对应的true

result应该是1 如果df2
中没有对应的trueresult应该是0 如果df2

中没有对应的idresult应该是null

我想不出一个有效的方法来做到这一点。加入后我只坚持第三个条件:

df = df1.join(df2, 'id', 'full')
df.show()

#  +---+------------+---------+
#  | id|   many_cols|criterion|
#  +---+------------+---------+
#  |  1|lots_of_data|    false|
#  |  1|lots_of_data|     true|
#  |  1|lots_of_data|     true|
#  |  3|lots_of_data|    false|
#  |  2|lots_of_data|     null|
#  +---+------------+---------+

PySpark 数据帧是这样创建的:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

df1cols = ['id', 'many_cols']
df1data = [(1, 'lots_of_data'),
           (2, 'lots_of_data'),
           (3, 'lots_of_data')]
df2cols = ['id', 'criterion']
df2data = [(1, False),
           (1, True),
           (1, True),
           (3, None)]
df1 = spark.createDataFrame(df1data, df1cols)
df2 = spark.createDataFrame(df2data, df2cols)

最佳答案

一个简单的方法是对 df2 进行分组,通过 iddf1 的连接获得最大的 criterion ,这样你就可以减少加入的行数。如果至少有一个对应的真值,则 bool 列的最大值为真:

from pyspark.sql import functions as F

df2_group = df2.groupBy("id").agg(F.max("criterion").alias("criterion"))

result = df1.join(df2_group, ["id"], "left").withColumn(
    "result",
    F.col("criterion").cast("int")
).drop("criterion")

result.show()
#+---+------------+------+
#| id|   many_cols|result|
#+---+------------+------+
#|  1|lots_of_data|     1|
#|  3|lots_of_data|     0|
#|  2|lots_of_data|  null|
#+---+------------+------+

关于python - 使用条件结果列连接 PySpark 数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66429301/

相关文章:

java - 重命名后如何检查 Spark 列的相等性

scala - 线程 "main"java.lang.NoClassDefFoundError : org/apache/spark/sql/catalyst/StructFilters in spark scala application in intellij 中出现异常

Python 请求 : Handling JSON response, 存储到列表或字典?

Python:使用 pandas 导入 csv。尝试绘制一列,但出现错误,提示 "no numerical data to plot"

elasticsearch - 如何将数据从Cassandra复制到Elasticsearch?

python - PySpark:获取数据框中每列的第一个非空值

apache-spark-sql - 从 spark sql 中的时间间隔中提取 HOUR

python - 我如何检查 2 个多项式是否同余模 (h(x), n)?

python - 解决 I/O 限制问题的最佳方法?

scala - 如何查找 Spark 中每个分区的总和