python - 将数据框中带有时间戳的多行事件转换为带有开始和结束日期时间的单行

标签 python apache-spark pyspark

我有单个设备的行,我想对所有按顺序发生的相同事件进行分组。

我也想用 pyspark 做这个

因此给出以下内容:

+--------------------+-------+
|      datetime      | event |
+--------------------+-------+
| 12-02-18T08:20:00  |     1 |
| 12-02-18T08:25:00  |     1 |
| 12-02-18T08:30:00  |     1 |
| 12-02-18T09:00:00  |     2 |
| 12-02-18T09:05:00  |     2 |
| 12-02-18T09:10:00  |     1 |
| 12-02-18T09:15:00  |     1 |
+--------------------+-------+

我想以以下结尾:

+-------------------+-------------------+-------+
|    start_time     |     end_time      | event |
+-------------------+-------------------+-------+
| 12-02-18T08:20:00 | 12-02-18T09:00:00 |     1 |
| 12-02-18T09:00:00 | 12-02-18T09:10:00 |     2 |
| 12-02-18T09:10:00 | null              |     1 |
+-------------------+-------------------+-------+

不会有重叠事件,因此不需要考虑。我教过如何使用 UDF 执行此操作,但想知道是否有人知道更优雅/更有效的方法。

最佳答案

使用 Florian 提供的方法(Window 函数),可以通过在 Scala 上获取具有更改事件的行,然后获取下一个更改日期来完成:

val df = List(
  ("12-02-18T08:20:00", 1),
  ("12-02-18T08:25:00", 1),
  ("12-02-18T08:30:00", 1),
  ("12-02-18T09:00:00", 2),
  ("12-02-18T09:05:00", 2),
  ("12-02-18T09:10:00", 1),
  ("12-02-18T09:15:00", 1)
).toDF("datetime", "event")
df.show(false)

val w = Window.orderBy("datetime")
val changedRowsOnlyDF = df.withColumn("changed", $"event" =!= lag($"event", 1, 0).over(w))
  .where($"changed")

val result = changedRowsOnlyDF
  .withColumn("end_time", lead($"datetime", 1).over(w))
  .drop("changed")
  .withColumnRenamed("datetime", "start_time")
result.show(false)

输出:

+-----------------+-----+-----------------+
|start_time       |event|end_time         |
+-----------------+-----+-----------------+
|12-02-18T08:20:00|1    |12-02-18T09:00:00|
|12-02-18T09:00:00|2    |12-02-18T09:10:00|
|12-02-18T09:10:00|1    |null             |
+-----------------+-----+-----------------+

免责声明:这种方法可用于小数据量,Spark通过消息通知:

WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

关于python - 将数据框中带有时间戳的多行事件转换为带有开始和结束日期时间的单行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52491603/

相关文章:

python - Google Sheets API json 文件 - CLIENT_SECRET 和 oauth2client 凭据之间有什么区别?

python - 从文件创建列表字典

apache-spark - 在Spark中运行任务时发生错误ExecutorLostFailure

amazon-web-services - AWS JupyterHub pyspark notebook 使用 pandas 模块

apache-spark - Pyspark pyspark.rdd.PipelinedRDD 不适用于模型

python - 如何在seaborn fiddle 图上添加阴影?

css - Spark UI 显示格式错误(损坏的 CSS)

apache-spark - kafka - 多个主题与多个分区

python - Pyspark:将列值与另一个值进行比较

python - 数据框 - 对一些列求和,从其他列中获取最后一个值