apache-spark - Spark 窗口函数中的条件

标签 apache-spark pyspark apache-spark-sql window-functions

我有一个类似的数据框

+---+---+---+---+
|  q|  w|  e|  r|
+---+---+---+---+
|  a|  1| 20|  y|
|  a|  2| 22|  z|
|  b|  3| 10|  y|
|  b|  4| 12|  y|
+---+---+---+---+

我想用最小的 er = z 标记行。如果没有具有 r = z 的行,我想要具有最小 e 的行,即使 r = y 也是如此。 本质上,像

+---+---+---+---+---+
|  q|  w|  e|  r|  t|
+---+---+---+---+---+
|  a|  1| 20|  y|  0|
|  a|  2| 22|  z|  1|
|  b|  3| 10|  y|  1|
|  b|  4| 12|  y|  0|
+---+---+---+---+---+

我可以使用多个连接来做到这一点,但是那太昂贵了。 所以我一直在寻找基于窗口的解决方案。

最佳答案

您可以为具有 r = z 的行计算一次每组的最小值,然后为一个组中的所有行计算一次。然后可以将第一个非空值与 e 进行比较:

from pyspark.sql import functions as F
from pyspark.sql import Window

df = ...

w = Window.partitionBy("q")
#When ordering is not defined, an unbounded window frame is used by default.

df.withColumn("min_e_with_r_eq_z", F.expr("min(case when r='z' then e else null end)").over(w)) \
    .withColumn("min_e_overall", F.min("e").over(w)) \
    .withColumn("t", F.coalesce("min_e_with_r_eq_z","min_e_overall") == F.col("e")) \
    .orderBy("w") \
    .show()

输出:

+---+---+---+---+-----------------+-------------+-----+
|  q|  w|  e|  r|min_e_with_r_eq_z|min_e_overall|    t|
+---+---+---+---+-----------------+-------------+-----+
|  a|  1| 20|  y|               22|           20|false|
|  a|  2| 22|  z|               22|           20| true|
|  b|  3| 10|  y|             null|           10| true|
|  b|  4| 12|  y|             null|           10|false|
+---+---+---+---+-----------------+-------------+-----+

注意:我假设 q 是窗口的分组列。

关于apache-spark - Spark 窗口函数中的条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67514479/

相关文章:

java - 检测到逻辑计划之间 INNER 连接的隐式笛卡尔积

java - 传入参数的匿名方法

scala - Spark 提交错误 : Cannot load main class from JAR file

python - Apache pyspark 使用 oracle jdbc 拉取数据。找不到驱动程序

python - Pyspark-当从已经具有(错误)模式的Parquet文件读取时,如何强制spark再次推断模式?

apache-spark - 将多个原始文件合并为单个 Parquet 文件

mysql - 使用spark远程mysql数据库访问错误

java - 如何在spark sql中将json数组<String>转换为csv

java - awaitResult 抛出 Kafka Spark Streaming 异常

java - 如何在 Web 上运行 Apache Spark 作业后获取输出