apache-spark - Spark Dataframe 中基于行值序列的复杂行分组

标签 apache-spark apache-spark-sql

我有一组行,其中每个事件行均由“EventId”唯一标识。一组事件属于一个组,由“GroupId”标识。

"BeginEndMarker"= 1 是可能的开始事件。 "BeginEndMarker"= 5 是可能的结束事件。 "BeginEndMarker"= -1 是中间事件。

示例:

val df= Seq(
("GroupId1", "WF1", 1, "01-01-2023"),
("GroupId1", "WF2", -1, "01-02-2023"),
("GroupId1", "WF3", -1, "01-03-2023"),
("GroupId1", "WF4", 5, "01-04-2023"),
("GroupId1", "WF5", 5, "01-05-2023"),
("GroupId1", "WF6", 1, "01-06-2023"),
("GroupId1", "WF7", 1, "01-06-2023"),
("GroupId1", "WF8", -1, "01-07-2023"),
("GroupId1", "WF9", 5, "01-08-2023"),
("GroupId1", "WF10", 1, "01-09-2023"),
("GroupId1", "WF11", -1, "01-10-2023"),
).toDF("GroupId", "EventId","BeginEndMarker","Time")
df.show

+--------+-------+--------------+----------+
| GroupId|EventId|BeginEndMarker|      Time|
+--------+-------+--------------+----------+
|GroupId1|    WF1|             1|01-01-2023|
|GroupId1|    WF2|            -1|01-02-2023|
|GroupId1|    WF3|            -1|01-03-2023|
|GroupId1|    WF4|             5|01-04-2023|
|GroupId1|    WF5|             5|01-05-2023|
|GroupId1|    WF6|             1|01-06-2023|
|GroupId1|    WF7|             1|01-06-2023|
|GroupId1|    WF8|            -1|01-07-2023|
|GroupId1|    WF9|             5|01-08-2023|
|GroupId1|   WF10|             1|01-09-2023|
|GroupId1|   WF11|            -1|01-10-2023|
+--------+-------+--------------+----------+

这些事件行需要进行分组,以便每个子组以第一次出现“BeginEndMarker”=1 开始,该子组的结尾是最后一个“BeginEndMarker”=5,下一个事件为“BeginEndMarker”=1。 子组可能没有“BeginEndMarker”= 5 的事件,因为它可能是不完整的子组。

预期结果应该是:

+--------+-------+--------------+----------+--------+
| GroupId|EventId|BeginEndMarker|      Time|Subgroup|
+--------+-------+--------------+----------+--------+
|GroupId1|    WF1|             1|01-01-2023|     SG1|
|GroupId1|    WF2|            -1|01-02-2023|     SG1|
|GroupId1|    WF3|            -1|01-03-2023|     SG1|
|GroupId1|    WF4|             5|01-04-2023|     SG1|
|GroupId1|    WF5|             5|01-05-2023|     SG1|
|GroupId1|    WF6|             1|01-06-2023|     SG2|
|GroupId1|    WF7|             1|01-06-2023|     SG2|
|GroupId1|    WF8|            -1|01-07-2023|     SG2|
|GroupId1|    WF9|             5|01-08-2023|     SG2|
|GroupId1|   WF10|             1|01-09-2023|     SG3|
|GroupId1|   WF11|            -1|01-10-2023|     SG3|
+--------+-------+--------------+----------+--------+

知道如何在 Spark 中实现这一点而不使用 UDF 吗?

最佳答案

这是一个有趣的小谜题。我认为这可以通过评论建议的窗口函数来解决。 那么,我从你的例子中得出的结论是:

  • 1 个是开始事件,5 个是结束事件。 -1 可以忽略以确定分组。
  • 这些值需要按 EventId 排序才能解释 BeginMarkers 的顺序
  • 开始/结束事件可以有多个,在这种情况下,请使用第一个开始事件作为新组的开始。
  • 需要关注的是从 stae 5 到 1 的转换,因为这是创建新组(第一组除外)的触发器

我认为解决方案看起来像这样:

from pyspark.sql import Window
import pyspark.sql.functions as f

w_events = Window.partitionBy("GroupId).orderBy("EventId")
w_cumsum = w_events.rowsBetween(Window.unboundedPreceding, 0)

grouped_events = (events
  .withColumn("is_new_event_start", f.when(f.col("BeginEndMarker") == 1 && f.lag("BeginEndMarker", 1).over(w_events) == 5, 1).otherwise(0))
  .withColumn("group_number", f.sum("is_new_event_start").over(w_cumsum))
  .withColumn("Subgroup", f.concat(f.lit("SG"), f.col("group_number"))
)

我没有测试上面的代码,但类似的东西应该可以工作。您创建一个窗口来指示顺序,并使用 lag 函数来检测从状态 5 到 1 的转换。您创建一个新列以用“1”指示新组的开始位置。然后,您对新创建的列进行累积和,每次有新事件时,子组 ID 都会增加 1。

附注该代码可能不适用于第一行,因为在第一次开始之前没有结束。您可以通过给出 lag function 来解决这个问题默认值也许是 5

关于apache-spark - Spark Dataframe 中基于行值序列的复杂行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76234799/

相关文章:

scala - 使用 Spark Dataframe scala 将多个不同列转换为 Map 列

apache-spark - Apache Spark : Dag is not executed twice for reduceByKey

apache-spark - "predicate pushdown"和 "projection pushdown"有什么区别?

java - 对多列进行分组而不进行聚合

apache-spark - Spark for Python - 无法将字符串列转换为十进制/ double

apache-spark - 替换深层嵌套模式 Spark Dataframe 中的值

python - 如何从pyspark中的数组中提取元素

apache-spark - 分组以将 hive 中的多列值合并为一列

apache-spark - 使用 Pyspark 与 Hbase 交互的最佳方式是什么

apache-spark - Spark 失败,因为 S3 文件已更新。如何消除这个错误呢?