apache-spark - pyspark:使用时间序列数据的滚动平均值

标签 apache-spark pyspark window-functions moving-average

我有一个由时间戳列和美元列组成的数据集。我想找到以每行的时间戳结束的每周平均美元数。我最初正在查看 pyspark.sql.functions.window 函数,但它按周对数据进行分类。

下面是一个例子:

%pyspark
import datetime
from pyspark.sql import functions as F

df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"])
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp'))

w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg'))
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect()

这导致两条记录:
|        start        |          end         | avg |
|---------------------|----------------------|-----|
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0|
|---------------------|----------------------|-----|
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0|
|---------------------|----------------------|-----|

窗口函数对时间序列数据进行分箱,而不是执行滚动平均。

有没有办法执行滚动平均值,我将获得每行的每周平均值,时间段结束于该行的 timestampGMT?

编辑:

下面张的回答接近我想要的,但不完全是我想看到的。

这是一个更好的例子来展示我想要得到的东西:
%pyspark
from pyspark.sql import functions as F
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
                        (13, "2017-03-15T12:27:18+00:00"),
                        (25, "2017-03-18T11:27:18+00:00")],
                        ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days"))))

这导致以下数据帧:
dollars timestampGMT            rolling_average
25      2017-03-18 11:27:18.0   25
17      2017-03-10 15:27:18.0   15
13      2017-03-15 12:27:18.0   15

我希望在timestampGMT 列中处理日期的平均值超过一周,这将导致:
dollars timestampGMT            rolling_average
17      2017-03-10 15:27:18.0   17
13      2017-03-15 12:27:18.0   15
25      2017-03-18 11:27:18.0   19

在上面的结果中,2017-03-10 的 rolling_average 是 17,因为没有前面的记录。 2017-03-15 的滚动平均值为 15,因为它是 2017-03-15 的 13 和 2017-03-10 的 17 的平均值,后者落在前 7 天窗口内。 2017-03-18 的滚动平均值为 19,因为它是 2017-03-18 的 25 和 2017-03-10 的 13 的平均值,后者落在前 7 天窗口内,并且不包括 2017 的 17 -03-10 因为这不属于前 7 天的窗口。

有没有办法做到这一点,而不是每周窗口不重叠的分箱窗口?

最佳答案

我想出了使用此 stackoverflow 计算移动/滚动平均值的正确方法:

Spark Window Functions - rangeBetween dates

基本思想是将时间戳列转换为秒,然后您可以使用 pyspark.sql.Window 类中的 rangeBetween 函数在窗口中包含正确的行。

这是已解决的示例:

%pyspark
from pyspark.sql import functions as F
from pyspark.sql.window import Window


#function to calculate number of seconds from number of days
days = lambda i: i * 86400

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
                        (13, "2017-03-15T12:27:18+00:00"),
                        (25, "2017-03-18T11:27:18+00:00")],
                        ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('rolling_average', F.avg("dollars").over(w))

这导致我正在寻找的滚动平均值的确切列:
dollars   timestampGMT            rolling_average
17        2017-03-10 15:27:18.0   17.0
13        2017-03-15 12:27:18.0   15.0
25        2017-03-18 11:27:18.0   19.0

关于apache-spark - pyspark:使用时间序列数据的滚动平均值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45806194/

相关文章:

python-3.x - 数据框列的 Pyspark 并行循环

sql - 按最大总和选择,但结果中没有总和

sql - 带条件的窗函数

scala - 不带 UDF 的 Spark 数据集的加权平均值

sql - 使用Spark DataFrame groupby时如何获取其他列?

python - 从 PySpark 中的两个不同数据框中减去列的值以查找 RMSE

hadoop - 无法通过PySpark访问HDFS中的文件

sql - 按顺序更新列值,其中新值基于 'previous' 行中的更新值

apache-spark - 在YARN中启用CPU调度是否会真正改善Spark中的并行处理?

hadoop - 运行spark wordcount示例时出现IllegalStateException