sql - Spark 窗口函数 - rangeBetween 日期

标签 sql apache-spark pyspark apache-spark-sql window-functions

我有一个 Spark SQL DataFrame带有数据,我想要获取的是给定日期范围内当前行之前的所有行。因此,例如,我希望在给定行之前拥有 7 天前的所有行。我发现我需要使用 Window Function喜欢:

Window \
    .partitionBy('id') \
    .orderBy('start')

问题来了。我想要一个 rangeBetween 7 天,但我在 Spark 文档中找不到任何内容。 Spark 甚至提供这样的选项吗?现在我只是得到所有前面的行:

.rowsBetween(-sys.maxsize, 0)

但想要实现以下目标:

.rangeBetween("7 days", 0)

如果有人可以帮助我解决这个问题,我将不胜感激。提前致谢!

最佳答案

Spark >= 2.3

从 Spark 2.3 开始,可以使用 SQL API 使用间隔对象,但 DataFrame API 支持是 still work in progress .

df.createOrReplaceTempView("df")

spark.sql(
    """SELECT *, mean(some_value) OVER (
        PARTITION BY id 
        ORDER BY CAST(start AS timestamp) 
        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
     ) AS mean FROM df""").show()

## +---+----------+----------+------------------+       
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

Spark < 2.3

据我所知,在 Spark 和 Hive 中都不可能直接使用。两者都需要 ORDER BYRANGE 一起使用的子句为数字。我发现的最接近的事情是转换为时间戳并以秒为单位进行操作。假设 start列包含 date类型:

from pyspark.sql import Row

row = Row("id", "start", "some_value")
df = sc.parallelize([
    row(1, "2015-01-01", 20.0),
    row(1, "2015-01-06", 10.0),
    row(1, "2015-01-07", 25.0),
    row(1, "2015-01-12", 30.0),
    row(2, "2015-01-01", 5.0),
    row(2, "2015-01-03", 30.0),
    row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))

一个小助手和窗口定义:

from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col


# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400 

最后查询:

w = (Window()
   .partitionBy(col("id"))
   .orderBy(col("start").cast("timestamp").cast("long"))
   .rangeBetween(-days(7), 0))

df.select(col("*"), mean("some_value").over(w).alias("mean")).show()

## +---+----------+----------+------------------+
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

远非漂亮但有效。

* Hive Language Manual, Types

关于sql - Spark 窗口函数 - rangeBetween 日期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56434415/

相关文章:

mysql - 如何指定多个自连接的排序顺序

mysql - insert ... select ... on duplicate key update + delete obsolete 行

scala - 如何在 Spark Streaming 中验证 azure iot hub 的连接字符串?

apache-spark - pyspark - 从 Hive 分区列逻辑获取最新分区

apache-spark - PySpark - 运行进程

sql - SQL Server 中有用的系统存储过程

mysql - 按两列排序不起作用

apache-spark - Hadoop客户端无法连接到datanode

hadoop - 用于长时间运行和大型批处理的 Apache Ignite

apache-spark - Spark : Most efficient way to sort and partition data to be written as parquet