python - 有没有一种方法可以遍历 pyspark 数据框并在没有显式 session key 的情况下识别 session ?

标签 python apache-spark pyspark databricks

我有以下格式的 PySpark 数据框:

+-------+----------+---------------------+
| event | consumer |      timestamp      |
+-------+----------+---------------------+
| E     |        1 | 2020-09-09 13:15:00 |
| E     |        1 | 2020-09-09 13:30:00 |
| E     |        1 | 2020-09-09 14:20:00 |
| T     |        1 | 2020-09-09 14:35:00 |
| T     |        2 | 2020-09-09 13:20:00 |
| E     |        2 | 2020-09-09 13:25:00 |
| E     |        2 | 2020-09-09 14:45:00 |
| T     |        2 | 2020-09-09 14:50:00 |
+-------+----------+---------------------+

有没有一种方法可以遍历由 consumer 分区并按 timestamp 排序的组并将值设置为新列?

新列将定义 session_timestamp。这就是它背后的逻辑:

  • session 仅以事件 E 开始。
  • 如果在 session 开始后一小时内发生新事件,则它属于该 session 。
  • 如果某个事件发生的时间超过启动 session 的事件的一个小时,则它属于另一个 session (这就是 DataFrame 中第 2 行和第 3 行之间发生的情况)。

所以上面 Dataframe 的结果是:

+-------+----------+---------------------+---------------------+
| event | consumer |      timestamp      |  session_timestamp  |
+-------+----------+---------------------+---------------------+
| E     |        1 | 2020-09-09 13:15:00 | 2020-09-09 13:15:00 |
| E     |        1 | 2020-09-09 13:30:00 | 2020-09-09 13:15:00 |
| E     |        1 | 2020-09-09 14:20:00 | 2020-09-09 14:20:00 |
| T     |        1 | 2020-09-09 14:35:00 | 2020-09-09 14:20:00 |
| T     |        2 | 2020-09-09 13:20:00 | Null                |
| E     |        2 | 2020-09-09 13:25:00 | 2020-09-09 13:25:00 |
| E     |        2 | 2020-09-09 14:45:00 | 2020-09-09 14:45:00 |
| T     |        2 | 2020-09-09 14:50:00 | 2020-09-09 14:45:00 |
+-------+----------+---------------------+---------------------+

有没有办法在 Pyspark 上做到这一点?

最佳答案

正如@Ofek 在评论中所说,window功能会帮助你。这里给你一个scala的例子,你可以自己用python重写。 (考虑到pyspark中用户定义的聚合函数并不容易,这里收集并使用udf处理它)

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df = <your-dataframe>

val findSessionStartTime = udf((rows: Seq[Seq[Any]]) => {
  val parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  var result: Date = null
  for (row <- rows.reverse) {
    val event = row(0)
    val time = parser.parse(row(1).toString)
    if (event == "E") {
      if (result == null || result.getTime - time.getTime < 3600000) {
        result = time
      }
    }
  }
  if (result == null)
    null
  else
    parser.format(result)
})

df.withColumn("events", collect_list(array($"event", $"timestamp")).over(Window
  .partitionBy($"consumer")
  .orderBy($"timestamp")))
  .withColumn("session_timestamp", findSessionStartTime($"events"))
  .drop("events")
  .show(false)

结果如下:

(此外,您描述的示例结果不正确。2020-09-09 14:20:002020-09-09 13:30:00 之间的时间是 50 分钟 < 1 小时)

+-----+--------+-------------------+-------------------+
|event|consumer|timestamp          |session_timestamp  |
+-----+--------+-------------------+-------------------+
|E    |1       |2020-09-09 13:15:00|2020-09-09 13:15:00|
|E    |1       |2020-09-09 13:30:00|2020-09-09 13:15:00|
|E    |1       |2020-09-09 14:20:00|2020-09-09 13:15:00|
|T    |1       |2020-09-09 14:35:00|2020-09-09 13:15:00|
|T    |2       |2020-09-09 13:20:00|null               |
|E    |2       |2020-09-09 13:25:00|2020-09-09 13:25:00|
|E    |2       |2020-09-09 14:45:00|2020-09-09 14:45:00|
|T    |2       |2020-09-09 14:50:00|2020-09-09 14:45:00|
+-----+--------+-------------------+-------------------+

关于python - 有没有一种方法可以遍历 pyspark 数据框并在没有显式 session key 的情况下识别 session ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64374547/

相关文章:

python - 将 python 变量插入 mysql 表的行中

scala - 使用 Spark 中的动态列将 RDD 数据写入 CSV - Scala

java - 对象工具不是包 scala 的成员

hadoop - Spark - Snappy 库不可用

apache-spark - 高效地批处理 Spark 数据帧以调用 API

python - 使用 Selenium 的 text 和 innerHTML 之间的区别

python - Django NoReverseMatch 位于/

python - 通过遍历多个关系(外键到外键?)来过滤 Django 表单字段

ssh - Spark worker 不会绑定(bind)到 master

apache-spark - Apache Spark 用户推荐?