我有以下格式的 PySpark 数据框:
+-------+----------+---------------------+
| event | consumer | timestamp |
+-------+----------+---------------------+
| E | 1 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 13:30:00 |
| E | 1 | 2020-09-09 14:20:00 |
| T | 1 | 2020-09-09 14:35:00 |
| T | 2 | 2020-09-09 13:20:00 |
| E | 2 | 2020-09-09 13:25:00 |
| E | 2 | 2020-09-09 14:45:00 |
| T | 2 | 2020-09-09 14:50:00 |
+-------+----------+---------------------+
有没有一种方法可以遍历由 consumer
分区并按 timestamp
排序的组并将值设置为新列?
新列将定义 session_timestamp
。这就是它背后的逻辑:
- session 仅以事件
E
开始。 - 如果在 session 开始后一小时内发生新事件,则它属于该 session 。
- 如果某个事件发生的时间超过启动 session 的事件的一个小时,则它属于另一个 session (这就是 DataFrame 中第 2 行和第 3 行之间发生的情况)。
所以上面 Dataframe 的结果是:
+-------+----------+---------------------+---------------------+
| event | consumer | timestamp | session_timestamp |
+-------+----------+---------------------+---------------------+
| E | 1 | 2020-09-09 13:15:00 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 13:30:00 | 2020-09-09 13:15:00 |
| E | 1 | 2020-09-09 14:20:00 | 2020-09-09 14:20:00 |
| T | 1 | 2020-09-09 14:35:00 | 2020-09-09 14:20:00 |
| T | 2 | 2020-09-09 13:20:00 | Null |
| E | 2 | 2020-09-09 13:25:00 | 2020-09-09 13:25:00 |
| E | 2 | 2020-09-09 14:45:00 | 2020-09-09 14:45:00 |
| T | 2 | 2020-09-09 14:50:00 | 2020-09-09 14:45:00 |
+-------+----------+---------------------+---------------------+
有没有办法在 Pyspark 上做到这一点?
最佳答案
正如@Ofek 在评论中所说,window
功能会帮助你。这里给你一个scala的例子,你可以自己用python重写。 (考虑到pyspark中用户定义的聚合函数并不容易,这里收集并使用udf处理它)
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df = <your-dataframe>
val findSessionStartTime = udf((rows: Seq[Seq[Any]]) => {
val parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
var result: Date = null
for (row <- rows.reverse) {
val event = row(0)
val time = parser.parse(row(1).toString)
if (event == "E") {
if (result == null || result.getTime - time.getTime < 3600000) {
result = time
}
}
}
if (result == null)
null
else
parser.format(result)
})
df.withColumn("events", collect_list(array($"event", $"timestamp")).over(Window
.partitionBy($"consumer")
.orderBy($"timestamp")))
.withColumn("session_timestamp", findSessionStartTime($"events"))
.drop("events")
.show(false)
结果如下:
(此外,您描述的示例结果不正确。2020-09-09 14:20:00
和2020-09-09 13:30:00
之间的时间是 50 分钟 < 1 小时)
+-----+--------+-------------------+-------------------+
|event|consumer|timestamp |session_timestamp |
+-----+--------+-------------------+-------------------+
|E |1 |2020-09-09 13:15:00|2020-09-09 13:15:00|
|E |1 |2020-09-09 13:30:00|2020-09-09 13:15:00|
|E |1 |2020-09-09 14:20:00|2020-09-09 13:15:00|
|T |1 |2020-09-09 14:35:00|2020-09-09 13:15:00|
|T |2 |2020-09-09 13:20:00|null |
|E |2 |2020-09-09 13:25:00|2020-09-09 13:25:00|
|E |2 |2020-09-09 14:45:00|2020-09-09 14:45:00|
|T |2 |2020-09-09 14:50:00|2020-09-09 14:45:00|
+-----+--------+-------------------+-------------------+
关于python - 有没有一种方法可以遍历 pyspark 数据框并在没有显式 session key 的情况下识别 session ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64374547/