apache-spark - Pyspark : Custom window function

标签 apache-spark pyspark apache-spark-sql window-functions

我目前正在尝试在 PySpark 数据帧中提取一系列连续出现的事件并对它们进行排序/排序,如下所示(为方便起见,我已通过 user_idtimestamp 对初始数据帧进行了排序):

df_ini

+-------+--------------------+------------+
|user_id|     timestamp      |  actions   |
+-------+--------------------+------------+
| 217498|           100000001|    'A'     |
| 217498|           100000025|    'A'     |
| 217498|           100000124|    'A'     |
| 217498|           100000152|    'B'     |
| 217498|           100000165|    'C'     |
| 217498|           100000177|    'C'     |
| 217498|           100000182|    'A'     |
| 217498|           100000197|    'B'     |
| 217498|           100000210|    'B'     |
| 854123|           100000005|    'A'     |
| 854123|           100000007|    'A'     |
| etc.

到 :
expected df_transformed

+-------+------------+------------+------------+
|user_id|  actions   | nb_of_occ  |    order   |
+-------+------------+------------+------------+
| 217498|    'A'     |      3     |     1      |
| 217498|    'B'     |      1     |     2      |
| 217498|    'C'     |      2     |     3      |
| 217498|    'A'     |      1     |     4      |
| 217498|    'B'     |      2     |     5      |
| 854123|    'A'     |      2     |     1      |
| etc.

我的猜测是我必须使用一个智能窗口函数,通过 user_id 和操作对表进行分区,但前提是这些操作在时间上是连续的!我不知道该怎么做...

如果有人在 PySpark 中遇到这种类型的转换,我会很高兴得到提示!

干杯

最佳答案

这是一种非常常见的模式,可以通过几个步骤使用窗口函数来表达。首先导入所需的函数:

from pyspark.sql.functions import sum as sum_, lag, col, coalesce, lit
from pyspark.sql.window import Window

接下来定义一个窗口:
w = Window.partitionBy("user_id").orderBy("timestamp")

为每组标记第一行:
is_first = coalesce(
  (lag("actions", 1).over(w) != col("actions")).cast("bigint"),
  lit(1)
)

定义 order :
order = sum_("is_first").over(w)

并将所有部分与聚合组合在一起:
(df
    .withColumn("is_first", is_first)
    .withColumn("order", order)
    .groupBy("user_id", "actions", "order")
    .count())

如果您定义 df作为:
df = sc.parallelize([
    (217498, 100000001, 'A'), (217498, 100000025, 'A'), (217498, 100000124, 'A'),
    (217498, 100000152, 'B'), (217498, 100000165, 'C'), (217498, 100000177, 'C'),
    (217498, 100000182, 'A'), (217498, 100000197, 'B'), (217498, 100000210, 'B'),
    (854123, 100000005, 'A'), (854123, 100000007, 'A')
]).toDF(["user_id", "timestamp", "actions"])

并按 user_id 对结果进行排序和 order你会得到:

+-------+-------+-----+-----+ 
|user_id|actions|order|count|
+-------+-------+-----+-----+
| 217498|      A|    1|    3|
| 217498|      B|    2|    1|
| 217498|      C|    3|    2|
| 217498|      A|    4|    1|
| 217498|      B|    5|    2|
| 854123|      A|    1|    2|
+-------+-------+-----+-----+

关于apache-spark - Pyspark : Custom window function,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40404060/

相关文章:

json - 将 JSON 对象数组转换为 pyspark 中的字符串

apache-spark - 当我并行化一个大列表时 Spark 上下文关闭

hadoop - Spark 作为 Hive 的引擎

python - Pyspark UDF 中自定义 Python 对象的使用

python - PySpark - 根据另一列中引用的列名称创建列

scala - Spark 2.1.1 中获取窗口的最后一个元素

python - Spark Python提交错误: File does not exist: pyspark. zip

python - Pyspark - 为数据框定义自定义架构

python - Dataframe 加入空安全条件使用

scala - 比较Spark中当前行和上一行的值