sql - Spark SQL 复杂条件窗口函数

标签 sql apache-spark pyspark apache-spark-sql window-functions

这可能最容易通过示例来解释。假设我有一个用户登录网站的 DataFrame,例如:

scala> df.show(5)
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows

我想在此添加一列,指示他们何时成为网站上的活跃用户。但有一个警告:用户在一段时间内被视为活跃,在此期限之后,如果他们再次登录,他们的 became_active 日期将重置。假设该时间段为5 天。那么从上表派生出的所需表格将如下所示:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-11|   2012-01-11|
+----------------+----------+-------------+

因此,特别是 SirChillingtonIV 的 became_active 日期被重置,因为他们的第二次登录是在事件期到期后进行的,但 Booooooo99900098 的 became_active 日期在他第二次/时并未重置她登录了,因为它处于活跃期。

我最初的想法是使用带有lag的窗口函数,然后使用lagged值填充became_active列;例如,开头大致如下:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))

然后,如果 tmpnull(即,如果这是第一次登录),则填写 became_active 日期的规则为) 或者如果 login_date - tmp >= 5became_active = login_date;否则,转到 tmp 中的下一个最新值并应用相同的规则。这表明了一种递归方法,我很难想象一种实现方法。

我的问题:这是一种可行的方法吗?如果是,我如何“返回”并查看 tmp 的早期值,直到找到我停止的位置?据我所知,我无法迭代 Spark SQL Column 的值。还有其他方法可以达到这个结果吗?

最佳答案

Spark >= 3.2

最近的 Spark 版本为批处理和结构化流查询中的 session 窗口提供 native 支持(请参阅 SPARK-10816 及其子任务,特别是 SPARK-34893 )。

官方文档提供了不错的usage example .

Spark <3.2

这就是窍门。导入一堆函数:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}

定义窗口:

val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")

找到新 session 开始的点:

val newSession =  (coalesce(
  datediff($"login_date", lag($"login_date", 1).over(userWindow)),
  lit(0)
) > 5).cast("bigint")

val sessionized = df.withColumn("session", sum(newSession).over(userWindow))

查找每个 session 的最早日期:

val result = sessionized
  .withColumn("became_active", min($"login_date").over(userSessionWindow))
  .drop("session")

数据集定义为:

val df = Seq(
  ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
  ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), 
  ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
  ("SirChillingtonIV", "2012-08-11")
).toDF("user_name", "login_date")

结果是:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-04|   2012-01-04| <- The first session for user
|SirChillingtonIV|2012-01-11|   2012-01-11| <- The second session for user
|SirChillingtonIV|2012-01-14|   2012-01-11| 
|SirChillingtonIV|2012-08-11|   2012-08-11| <- The third session for user
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
+----------------+----------+-------------+

关于sql - Spark SQL 复杂条件窗口函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42448564/

相关文章:

SQL不适用于大样本

apache-spark - 在保持分区的同时缓存数据帧

python - 如何使用初始 GaussianMixtureModel 训练 GMM?

python - Pyspark 与 DBUtils

apache-spark - SparkContext对象没有属性esRDD(elasticsearch-spark连接器)

java - 无法将 5k/秒的记录插入到 impala 中?

MySQL - 选择行程次数最多但不重复的项目

mysql - 根据两个id获取计数

apache-spark - 如何处理 yarn 客户端中运行时间过长的任务(与其他工作相比)?

scala - Spark Scala 数据帧查找最大值