sql - pyspark.sql.functions.window 函数的 'startTime' 参数和 window.start 有什么作用?

标签 sql apache-spark dataframe pyspark window

示例如下:

df=spark.createDataFrame([
    (1,"2017-05-15 23:12:26",2.5),
    (1,"2017-05-09 15:26:58",3.5),
    (1,"2017-05-18 15:26:58",3.6),
    (2,"2017-05-15 15:24:25",4.8),
    (3,"2017-05-25 15:14:12",4.6)],["index","time","val"]).orderBy("index","time")
df.collect()

+-----+-------------------+---+
|index|               time|val|
+-----+-------------------+---+
|    1|2017-05-09 15:26:58|3.5|
|    1|2017-05-15 23:12:26|2.5|
|    1|2017-05-18 15:26:58|3.6|
|    2|2017-05-15 15:24:25|4.8|
|    3|2017-05-25 15:14:12|4.6|
+-----+-------------------+---+

对于函数“pyspark.sql.functions”
window(timeColumn, windowDuration, slideDuration=None, startTime=None)

timeColumn:The time column must be of TimestampType.

windowDuration:  Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.

slideDuration: If the 'slideDuration' is not provided, the windows will be tumbling windows.

startTime: the startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.

我想每 5 天计算一次此函数中的参数“val”,我将参数“slideDuration”设置为一个字符串值“5 天”
timeColumn="time",windowDuration="5 day",slideDuration="5 day"

代码如下:

df2=df.groupBy("index",F.window("time",windowDuration="5 day",slideDuration="5 day")).agg(F.sum("val").alias("sum_val"))

当我获得参数“window.start”的值时,时间不是从我在“时间”列中给出的最小时间或我之前设置的时间开始,而是从其他时间开始。

结果如下:
+-----+---------------------+---------------------+-------+
|index|start                |end                  |sum_val|
+-----+---------------------+---------------------+-------+
|1    |2017-05-09 08:00:00.0|2017-05-14 08:00:00.0|3.5    |
|1    |2017-05-14 08:00:00.0|2017-05-19 08:00:00.0|6.1    |
|2    |2017-05-14 08:00:00.0|2017-05-19 08:00:00.0|4.8    |
|3    |2017-05-24 08:00:00.0|2017-05-29 08:00:00.0|4.6    |
+-----+---------------------+---------------------+-------+

当我为参数“startTime”设置一个值为“0秒”时(代码如下):

df2=df.groupBy("index",F.window("time",windowDuration="5 day",slideDuration="5 day",startTime="0 second")).agg(F.sum("val").alias("sum_val"))

+-----+---------------------+---------------------+-------+
|index|start                |end                  |sum_val|
+-----+---------------------+---------------------+-------+
|1    |2017-05-09 08:00:00.0|2017-05-14 08:00:00.0|3.5    |
|1    |2017-05-14 08:00:00.0|2017-05-19 08:00:00.0|6.1    |
|2    |2017-05-14 08:00:00.0|2017-05-19 08:00:00.0|4.8    |
|3    |2017-05-24 08:00:00.0|2017-05-29 08:00:00.0|4.6    |
+-----+---------------------+---------------------+-------+

结果出来了还是没有从“时间”一栏的最少时间开始

那么如何让这个函数以“时间”列中的最短时间启动,或者我第一次设置的时间,比如“2017-05-09 15:25:30”,非常感谢你让我摆脱这个问题

'startTime'的官方介绍如下
The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. 
For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15...
provide `startTime` as `15 minutes`.

引用如下

1. What does the 'pyspark.sql.functions.window' function's 'startTime' argument do?

2. https://github.com/apache/spark/pull/12008

3. http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.functions.window

最佳答案

您遇到的问题与startTime完全无关并有两个组成部分:

  • 星火timestamp semantics其中时间戳始终根据本地时区进行处理。根据输出中显示的偏移量,我们得出结论,JVM 使用 GMT+8 或等效时区。请考虑以下两种情况:
    >>> from pyspark.sql.functions import window
    >>>
    >>> spark.conf.get("spark.driver.extraJavaOptions")
    '-Duser.timezone=GMT+8'
    >>> spark.conf.get("spark.executor.extraJavaOptions")
    '-Duser.timezone=GMT+8'
    >>> str(spark.sparkContext._jvm.java.util.TimeZone.getDefault())
    'sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]'
    >>>
    >>> df = spark.createDataFrame([(1,"2017-05-15 23:12:26",2.5)], ["index","time","val"])
    >>> (df
    ...     .withColumn("w", window("time" ,windowDuration="5 days" ,slideDuration="5 days"))
    ...     .show(1, False))
    ...     
    +-----+-------------------+---+---------------------------------------------+
    |index|time               |val|w                                            |
    +-----+-------------------+---+---------------------------------------------+
    |1    |2017-05-15 23:12:26|2.5|[2017-05-14 08:00:00.0,2017-05-19 08:00:00.0]|
    +-----+-------------------+---+---------------------------------------------+
    

    对比
    >>> from pyspark.sql.functions import window
    >>>
    >>> spark.conf.get("spark.driver.extraJavaOptions")
    '-Duser.timezone=UTC'
    >>> spark.conf.get("spark.executor.extraJavaOptions")
    '-Duser.timezone=UTC'
    >>> str(spark.sparkContext._jvm.java.util.TimeZone.getDefault())
    'sun.util.calendar.ZoneInfo[id="UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]'
    >>>
    >>> df = spark.createDataFrame([(1,"2017-05-15 23:12:26",2.5)], ["index","time","val"])
    >>> (df
    ...     .withColumn("w", window("time" ,windowDuration="5 days" ,slideDuration="5 days"))
    ...     .show(1, False))
    ... 
    +-----+-------------------+---+---------------------------------------------+
    |index|time               |val|w                                            |
    +-----+-------------------+---+---------------------------------------------+
    |1    |2017-05-15 23:12:26|2.5|[2017-05-14 00:00:00.0,2017-05-19 00:00:00.0]|
    +-----+-------------------+---+---------------------------------------------+
    

    如您所见,输出根据本地时区进行调整,而输入字符串被解析为 UTC 时间戳。
  • window语义。如果你看一下执行计划
    >>> df.withColumn("w", window("time",windowDuration="5 days",slideDuration="5 days")).explain(False)
    == Physical Plan ==
    *Project [index#21L, time#22, val#23, window#68 AS w#67]
    +- *Filter (((isnotnull(time#22) && isnotnull(window#68)) && (cast(time#22 as timestamp) >= window#68.start)) && (cast(time#22 as timestamp) < window#68.end))
       +- *Expand [List(named_struct(start, ((((CEIL((cast((precisetimestamp(cast(time#22 as timestamp)) - 0) as double) / 4.32E11)) + 0) - 1) * 432000000000) + 0), end, ((((CEIL((cast((precisetimestamp(cast(time#22 as timestamp)) - 0) as double) / 4.32E11)) + 0) - 1) * 432000000000) + 432000000000)), index#21L, time#22, val#23), List(named_struct(start, ((((CEIL((cast((precisetimestamp(cast(time#22 as timestamp)) - 0) as double) / 4.32E11)) + 1) - 1) * 432000000000) + 0), end, ((((CEIL((cast((precisetimestamp(cast(time#22 as timestamp)) - 0) as double) / 4.32E11)) + 1) - 1) * 432000000000) + 432000000000)), index#21L, time#22, val#23)], [window#68, index#21L, time#22, val#23]
          +- Scan ExistingRDD[index#21L,time#22,val#23]
    

    并专注于作为单个组件:
    ((((CEIL((cast((precisetimestamp(cast(time#22 as timestamp)) - 0) as double) / 4.32E11)) + 0) - 1) * 432000000000)
    

    您会看到窗口采用了数值的上限,有效地将时间戳截断为整个间隔。

  • 最后startTime
    df.groupBy("index",F.window("time",windowDuration="5 day",slideDuration="5  day",startTime="0 second"))
    

    根本没有效果,因为它的行为类似于默认(无偏移)。如果有什么你可以尝试:
    (startTime, ) = (df
        .select(min_(col("time").cast("timestamp")).alias("ts"))
        .select(
           ((col("ts").cast("double") - 
           col("ts").cast("date").cast("timestamp").cast("double")
          ) * 1000).cast("integer"))
         .first())
    
    w = window(
        "time", 
        windowDuration="5 days",
        slideDuration="5 days",
        startTime="{} milliseconds".format(startTime))
    
    
    df.withColumn("w", w).show(1, False)
    

    +-----+-------------------+---+---------------------------------------------+
    |index|time               |val|w                                            |
    +-----+-------------------+---+---------------------------------------------+
    |1    |2017-05-15 23:12:26|2.5|[2017-05-14 23:12:26.0,2017-05-19 23:12:26.0]|
    +-----+-------------------+---+---------------------------------------------
    

    关于sql - pyspark.sql.functions.window 函数的 'startTime' 参数和 window.start 有什么作用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48351951/

    相关文章:

    python - 将 Pandas 数据帧转换为 Dask 数据帧

    python - 包含字符串和 float 的 Pandas Dataframe 列

    mysql - Select count(*) from A update in B 超慢

    java - 每 5 秒告诉 Java Servlet "executeQuery"

    json - 如何在指定了_id字段的情况下将所有数据从Elastic Search Index导出为JSON格式的文件?

    java - 在 Spark 中读取自定义序列文件

    python - 如何找到数据帧不同子部分的最大值和总和并将其写入新数据帧?

    python - 使用 Django ORM 加入同一张表

    sql - IF NOT EXISTS 似乎不起作用

    java - 序列化 RDD