我有一组行,其中每个事件行均由“EventId”唯一标识。一组事件属于一个组,由“GroupId”标识。
"BeginEndMarker"= 1
是可能的开始事件。
"BeginEndMarker"= 5
是可能的结束事件。
"BeginEndMarker"= -1
是中间事件。
示例:
val df= Seq(
("GroupId1", "WF1", 1, "01-01-2023"),
("GroupId1", "WF2", -1, "01-02-2023"),
("GroupId1", "WF3", -1, "01-03-2023"),
("GroupId1", "WF4", 5, "01-04-2023"),
("GroupId1", "WF5", 5, "01-05-2023"),
("GroupId1", "WF6", 1, "01-06-2023"),
("GroupId1", "WF7", 1, "01-06-2023"),
("GroupId1", "WF8", -1, "01-07-2023"),
("GroupId1", "WF9", 5, "01-08-2023"),
("GroupId1", "WF10", 1, "01-09-2023"),
("GroupId1", "WF11", -1, "01-10-2023"),
).toDF("GroupId", "EventId","BeginEndMarker","Time")
df.show
+--------+-------+--------------+----------+
| GroupId|EventId|BeginEndMarker| Time|
+--------+-------+--------------+----------+
|GroupId1| WF1| 1|01-01-2023|
|GroupId1| WF2| -1|01-02-2023|
|GroupId1| WF3| -1|01-03-2023|
|GroupId1| WF4| 5|01-04-2023|
|GroupId1| WF5| 5|01-05-2023|
|GroupId1| WF6| 1|01-06-2023|
|GroupId1| WF7| 1|01-06-2023|
|GroupId1| WF8| -1|01-07-2023|
|GroupId1| WF9| 5|01-08-2023|
|GroupId1| WF10| 1|01-09-2023|
|GroupId1| WF11| -1|01-10-2023|
+--------+-------+--------------+----------+
这些事件行需要进行分组,以便每个子组以第一次出现“BeginEndMarker”=1 开始,该子组的结尾是最后一个“BeginEndMarker”=5,下一个事件为“BeginEndMarker”=1。 子组可能没有“BeginEndMarker”= 5 的事件,因为它可能是不完整的子组。
预期结果应该是:
+--------+-------+--------------+----------+--------+
| GroupId|EventId|BeginEndMarker| Time|Subgroup|
+--------+-------+--------------+----------+--------+
|GroupId1| WF1| 1|01-01-2023| SG1|
|GroupId1| WF2| -1|01-02-2023| SG1|
|GroupId1| WF3| -1|01-03-2023| SG1|
|GroupId1| WF4| 5|01-04-2023| SG1|
|GroupId1| WF5| 5|01-05-2023| SG1|
|GroupId1| WF6| 1|01-06-2023| SG2|
|GroupId1| WF7| 1|01-06-2023| SG2|
|GroupId1| WF8| -1|01-07-2023| SG2|
|GroupId1| WF9| 5|01-08-2023| SG2|
|GroupId1| WF10| 1|01-09-2023| SG3|
|GroupId1| WF11| -1|01-10-2023| SG3|
+--------+-------+--------------+----------+--------+
知道如何在 Spark 中实现这一点而不使用 UDF 吗?
最佳答案
这是一个有趣的小谜题。我认为这可以通过评论建议的窗口函数来解决。 那么,我从你的例子中得出的结论是:
- 1 个是开始事件,5 个是结束事件。 -1 可以忽略以确定分组。
- 这些值需要按 EventId 排序才能解释 BeginMarkers 的顺序
- 开始/结束事件可以有多个,在这种情况下,请使用第一个开始事件作为新组的开始。
- 需要关注的是从 stae 5 到 1 的转换,因为这是创建新组(第一组除外)的触发器
我认为解决方案看起来像这样:
from pyspark.sql import Window
import pyspark.sql.functions as f
w_events = Window.partitionBy("GroupId).orderBy("EventId")
w_cumsum = w_events.rowsBetween(Window.unboundedPreceding, 0)
grouped_events = (events
.withColumn("is_new_event_start", f.when(f.col("BeginEndMarker") == 1 && f.lag("BeginEndMarker", 1).over(w_events) == 5, 1).otherwise(0))
.withColumn("group_number", f.sum("is_new_event_start").over(w_cumsum))
.withColumn("Subgroup", f.concat(f.lit("SG"), f.col("group_number"))
)
我没有测试上面的代码,但类似的东西应该可以工作。您创建一个窗口来指示顺序,并使用 lag
函数来检测从状态 5 到 1 的转换。您创建一个新列以用“1”指示新组的开始位置。然后,您对新创建的列进行累积和,每次有新事件时,子组 ID 都会增加 1。
附注该代码可能不适用于第一行,因为在第一次开始之前没有结束。您可以通过给出 lag function 来解决这个问题默认值也许是 5
关于apache-spark - Spark Dataframe 中基于行值序列的复杂行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76234799/