我目前正在尝试在 PySpark 数据帧中提取一系列连续出现的事件并对它们进行排序/排序,如下所示(为方便起见,我已通过 user_id
和 timestamp
对初始数据帧进行了排序):
df_ini
+-------+--------------------+------------+
|user_id| timestamp | actions |
+-------+--------------------+------------+
| 217498| 100000001| 'A' |
| 217498| 100000025| 'A' |
| 217498| 100000124| 'A' |
| 217498| 100000152| 'B' |
| 217498| 100000165| 'C' |
| 217498| 100000177| 'C' |
| 217498| 100000182| 'A' |
| 217498| 100000197| 'B' |
| 217498| 100000210| 'B' |
| 854123| 100000005| 'A' |
| 854123| 100000007| 'A' |
| etc.
到 :
expected df_transformed
+-------+------------+------------+------------+
|user_id| actions | nb_of_occ | order |
+-------+------------+------------+------------+
| 217498| 'A' | 3 | 1 |
| 217498| 'B' | 1 | 2 |
| 217498| 'C' | 2 | 3 |
| 217498| 'A' | 1 | 4 |
| 217498| 'B' | 2 | 5 |
| 854123| 'A' | 2 | 1 |
| etc.
我的猜测是我必须使用一个智能窗口函数,通过 user_id 和操作对表进行分区,但前提是这些操作在时间上是连续的!我不知道该怎么做...
如果有人在 PySpark 中遇到这种类型的转换,我会很高兴得到提示!
干杯
最佳答案
这是一种非常常见的模式,可以通过几个步骤使用窗口函数来表达。首先导入所需的函数:
from pyspark.sql.functions import sum as sum_, lag, col, coalesce, lit
from pyspark.sql.window import Window
接下来定义一个窗口:
w = Window.partitionBy("user_id").orderBy("timestamp")
为每组标记第一行:
is_first = coalesce(
(lag("actions", 1).over(w) != col("actions")).cast("bigint"),
lit(1)
)
定义
order
:order = sum_("is_first").over(w)
并将所有部分与聚合组合在一起:
(df
.withColumn("is_first", is_first)
.withColumn("order", order)
.groupBy("user_id", "actions", "order")
.count())
如果您定义
df
作为:df = sc.parallelize([
(217498, 100000001, 'A'), (217498, 100000025, 'A'), (217498, 100000124, 'A'),
(217498, 100000152, 'B'), (217498, 100000165, 'C'), (217498, 100000177, 'C'),
(217498, 100000182, 'A'), (217498, 100000197, 'B'), (217498, 100000210, 'B'),
(854123, 100000005, 'A'), (854123, 100000007, 'A')
]).toDF(["user_id", "timestamp", "actions"])
并按
user_id
对结果进行排序和 order
你会得到:+-------+-------+-----+-----+
|user_id|actions|order|count|
+-------+-------+-----+-----+
| 217498| A| 1| 3|
| 217498| B| 2| 1|
| 217498| C| 3| 2|
| 217498| A| 4| 1|
| 217498| B| 5| 2|
| 854123| A| 1| 2|
+-------+-------+-----+-----+
关于apache-spark - Pyspark : Custom window function,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40404060/