scala - Spark : Calculate event end time on 30-minute intervals based on start time and duration values in previous rows

标签 scala apache-spark dataframe hadoop apache-spark-sql

我有一个包含 event_time 字段的文件,每条记录每 30 分钟生成一次,并指示事件持续了多少秒。 示例:

Event_time | event_duration_seconds
09:00      | 800
09:30      | 1800
10:00      | 2700
12:00      | 1000
13:00      | 1000

我需要将连续事件转换为仅具有持续时间的事件。输出文件应如下所示:

Event_time_start | event_time_end | event_duration_seconds
09:00            | 11:00          | 5300
12:00            | 12:30          | 1000
13:00            | 13:30          | 1000

Scala Spark 中是否有一种方法可以将数据帧记录与下一个数据帧记录进行比较?

我尝试使用 foreach 循环,但这不是一个好的选择,因为它需要处理大量数据

最佳答案

这不是一个小问题,但这里有一个解决方案,步骤如下:

  1. 创建 UDF 来计算下一个最近的 30 分钟事件结束时间 event_ts_end使用java.time API
  2. 使用窗口函数lag获取上一行的事件时间
  3. 使用when/otherwise生成列event_ts_startnull如果事件时间与上一行的时间差为 30 分钟,则为该值
  4. 使用窗口函数last(event_ts_start, ignoreNulls=true)回填null s 与最后一个 event_ts_start
  5. event_ts_start 对数据进行分组聚合event_durationevent_ts_end

首先,让我们组装一个示例数据集:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val df = Seq(
  (101, "2019-04-01 09:00", 800),
  (101, "2019-04-01 09:30", 1800),
  (101, "2019-04-01 10:00", 2700),
  (101, "2019-04-01 12:00", 1000),
  (101, "2019-04-01 13:00", 1000),
  (220, "2019-04-02 10:00", 1500),
  (220, "2019-04-02 10:30", 2400)
).toDF("event_id", "event_time", "event_duration")

请注意,示例数据集已稍微概括为包含多个单个事件,并使事件时间包含 date信息涵盖跨越给定日期的事件的情况。

步骤1 :

import java.sql.Timestamp

def get_next_closest(seconds: Int) = udf{ (ts: Timestamp, duration: Int) =>
  import java.time.LocalDateTime
  import java.time.format.DateTimeFormatter

  val iter = Iterator.iterate(ts.toLocalDateTime)(_.plusSeconds(seconds)).
    dropWhile(_.isBefore(ts.toLocalDateTime.plusSeconds(duration)))

  iter.next.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
}

步骤2 - 5 :

val winSpec = Window.partitionBy("event_id").orderBy("event_time")

val seconds = 30 * 60

df.
  withColumn("event_ts", to_timestamp($"event_time", "yyyy-MM-dd HH:mm")).
  withColumn("event_ts_end", get_next_closest(seconds)($"event_ts", $"event_duration")).
  withColumn("prev_event_ts", lag($"event_ts", 1).over(winSpec)).
  withColumn("event_ts_start",  when($"prev_event_ts".isNull ||
    unix_timestamp($"event_ts") - unix_timestamp($"prev_event_ts") =!= seconds, $"event_ts"
  )).
  withColumn("event_ts_start", last($"event_ts_start", ignoreNulls=true).over(winSpec)).
  groupBy($"event_id", $"event_ts_start").agg(
    sum($"event_duration").as("event_duration"), max($"event_ts_end").as("event_ts_end")
  ).show
// +--------+-------------------+--------------+-------------------+
// |event_id|     event_ts_start|event_duration|       event_ts_end|
// +--------+-------------------+--------------+-------------------+
// |     101|2019-04-01 09:00:00|          5300|2019-04-01 11:00:00|
// |     101|2019-04-01 12:00:00|          1000|2019-04-01 12:30:00|
// |     101|2019-04-01 13:00:00|          1000|2019-04-01 13:30:00|
// |     220|2019-04-02 10:00:00|          3900|2019-04-02 11:30:00|
// +--------+-------------------+--------------+-------------------+

关于scala - Spark : Calculate event end time on 30-minute intervals based on start time and duration values in previous rows,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55697170/

相关文章:

apache-spark - Spark : processing multiple kafka topic in parallel

python - 合并数据框而不重复列

r - 计算数据框中自开始以来的月数

scala - Scala 中的柯里化(Currying)与匿名函数

scala - 如何在scala中执行命令?

scala - 如何在 Scala 中将数字格式化为百分比?

python - Spark Dataframe 区分名称重复的列

scala - 免费 ~> 蹦床 : recursive program crashes with OutOfMemoryError

apache-spark - Spark 在使用 Docker Mesos 集群进行身份验证时挂起

python - 从列表中添加数据框中的列