我有单个设备的行,我想对所有按顺序发生的相同事件进行分组。
我也想用 pyspark 做这个
因此给出以下内容:
+--------------------+-------+
| datetime | event |
+--------------------+-------+
| 12-02-18T08:20:00 | 1 |
| 12-02-18T08:25:00 | 1 |
| 12-02-18T08:30:00 | 1 |
| 12-02-18T09:00:00 | 2 |
| 12-02-18T09:05:00 | 2 |
| 12-02-18T09:10:00 | 1 |
| 12-02-18T09:15:00 | 1 |
+--------------------+-------+
我想以以下结尾:
+-------------------+-------------------+-------+
| start_time | end_time | event |
+-------------------+-------------------+-------+
| 12-02-18T08:20:00 | 12-02-18T09:00:00 | 1 |
| 12-02-18T09:00:00 | 12-02-18T09:10:00 | 2 |
| 12-02-18T09:10:00 | null | 1 |
+-------------------+-------------------+-------+
不会有重叠事件,因此不需要考虑。我教过如何使用 UDF 执行此操作,但想知道是否有人知道更优雅/更有效的方法。
最佳答案
使用 Florian 提供的方法(Window 函数),可以通过在 Scala 上获取具有更改事件的行,然后获取下一个更改日期来完成:
val df = List(
("12-02-18T08:20:00", 1),
("12-02-18T08:25:00", 1),
("12-02-18T08:30:00", 1),
("12-02-18T09:00:00", 2),
("12-02-18T09:05:00", 2),
("12-02-18T09:10:00", 1),
("12-02-18T09:15:00", 1)
).toDF("datetime", "event")
df.show(false)
val w = Window.orderBy("datetime")
val changedRowsOnlyDF = df.withColumn("changed", $"event" =!= lag($"event", 1, 0).over(w))
.where($"changed")
val result = changedRowsOnlyDF
.withColumn("end_time", lead($"datetime", 1).over(w))
.drop("changed")
.withColumnRenamed("datetime", "start_time")
result.show(false)
输出:
+-----------------+-----+-----------------+
|start_time |event|end_time |
+-----------------+-----+-----------------+
|12-02-18T08:20:00|1 |12-02-18T09:00:00|
|12-02-18T09:00:00|2 |12-02-18T09:10:00|
|12-02-18T09:10:00|1 |null |
+-----------------+-----+-----------------+
免责声明:这种方法可用于小数据量,Spark通过消息通知:
WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
关于python - 将数据框中带有时间戳的多行事件转换为带有开始和结束日期时间的单行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52491603/