我正在尝试计算窗口上的滚动加权平均值 (partition by id1, id2 ORDER BY unixTime)
在 Pyspark 中,想知道是否有人对如何做到这一点有想法。
滚动平均值将采用当前行的列值、该列的前 9 个行值和该列的 9 个后续行值,并根据每个值在该行中的方式加权。因此,当前行的权重为 10 倍,滞后 1/领先 1 值的权重为 9 倍。
如果没有一个值为空,那么加权平均值的分母将为 100。 一个警告是,如果有空值,我们仍然要计算移动平均值(除非有超过 1/2 的值是空值)。
因此,例如,如果当前 val 之前的 9 个值为空,则分母将为 55。如果超过 1/2 的值为空,那么我们将为加权平均值输出 NULL。我们也可以使用我们说如果分母小于 40 或其他什么的逻辑,输出 null。
我附上了一个屏幕截图来解释我在说什么,以防它令人困惑,希望这可以解决问题:
我知道我可以在 sql 中执行此操作(并且我可以将数据框保存为临时 View ),但是因为我必须为多列执行此滚动平均(完全相同的逻辑),理想情况下,如果我可以在 Pyspark 中执行此操作,我会能够编写一个 for 循环,然后为每一列执行此操作。另外,我很想有效地做到这一点。我已经阅读了许多关于滚动平均线的主题,但我认为这种情况略有不同。
对不起,如果我过于复杂,希望它是有道理的。 如果这不容易有效地做到,我知道如何通过在窗口上列出 lag(val, 10) ... lag(val, 9) over window ... 等来计算它,并且可以使用那。
最佳答案
IIUC,你可以尝试的一种方法是使用Window函数collect_list,对列表进行排序,找到位置idx
当前行的使用 array_position ( 需要 Spark 2.4+ )然后基于此计算权重,让我们使用大小为 7 的示例窗口(或以下代码中的 N=3):
from pyspark.sql.functions import expr, sort_array, collect_list, struct
from pyspark.sql import Window
df = spark.createDataFrame([
(0, 0.5), (1, 0.6), (2, 0.65), (3, 0.7), (4, 0.77),
(5, 0.8), (6, 0.7), (7, 0.9), (8, 0.99), (9, 0.95)
], ["time", "val"])
N = 3
w1 = Window.partitionBy().orderBy('time').rowsBetween(-N,N)
# note that the index for array_position is 1-based, `i` in transform function is 0-based
df1 = df.withColumn('data', sort_array(collect_list(struct('time','val')).over(w1))) \
.withColumn('idx', expr("array_position(data, (time,val))-1")) \
.withColumn('weights', expr("transform(data, (x,i) -> 10 - abs(i-idx))"))
df1.show(truncate=False)
+----+----+-------------------------------------------------------------------------+---+----------------------+
|time|val |data |idx|weights |
+----+----+-------------------------------------------------------------------------+---+----------------------+
|0 |0.5 |[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7]] |0 |[10, 9, 8, 7] |
|1 |0.6 |[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77]] |1 |[9, 10, 9, 8, 7] |
|2 |0.65|[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8]] |2 |[8, 9, 10, 9, 8, 7] |
|3 |0.7 |[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7]] |3 |[7, 8, 9, 10, 9, 8, 7]|
|4 |0.77|[[1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9]] |3 |[7, 8, 9, 10, 9, 8, 7]|
|5 |0.8 |[[2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99]]|3 |[7, 8, 9, 10, 9, 8, 7]|
|6 |0.7 |[[3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]|3 |[7, 8, 9, 10, 9, 8, 7]|
|7 |0.9 |[[4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]] |3 |[7, 8, 9, 10, 9, 8] |
|8 |0.99|[[5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]] |3 |[7, 8, 9, 10, 9] |
|9 |0.95|[[6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]] |3 |[7, 8, 9, 10] |
+----+----+-------------------------------------------------------------------------+---+----------------------+
然后我们可以使用 SparkSQL 内置函数 aggregate计算权重和加权值的总和:N = 9
w1 = Window.partitionBy().orderBy('time').rowsBetween(-N,N)
df_new = df.withColumn('data', sort_array(collect_list(struct('time','val')).over(w1))) \
.withColumn('idx', expr("array_position(data, (time,val))-1")) \
.withColumn('weights', expr("transform(data, (x,i) -> 10 - abs(i-idx))"))\
.withColumn('sum_weights', expr("aggregate(weights, 0D, (acc,x) -> acc+x)")) \
.withColumn('weighted_val', expr("""
aggregate(
zip_with(data,weights, (x,y) -> x.val*y),
0D,
(acc,x) -> acc+x,
acc -> acc/sum_weights
)""")) \
.drop("data", "idx", "sum_weights", "weights")
df_new.show()
+----+----+------------------+
|time| val| weighted_val|
+----+----+------------------+
| 0| 0.5|0.6827272727272726|
| 1| 0.6|0.7001587301587302|
| 2|0.65|0.7169565217391304|
| 3| 0.7|0.7332876712328767|
| 4|0.77| 0.7492|
| 5| 0.8|0.7641333333333333|
| 6| 0.7|0.7784931506849315|
| 7| 0.9|0.7963768115942028|
| 8|0.99|0.8138095238095238|
| 9|0.95|0.8292727272727273|
+----+----+------------------+
备注:struct('time','val1', 'val2')
来计算多列在计算df_new的第一行然后调整idx
的对应计算和 x.val*y
在 weighted_val
等等。IF(size(data) <= 9, NULL, ...)
或 IF(sum_weights < 40, NULL, ...)
声明如下: df_new = df.withColumn(...) \
...
.withColumn('weighted_val', expr(""" IF(size(data) <= 9, NULL,
aggregate(
zip_with(data,weights, (x,y) -> x.val*y),
0D,
(acc,x) -> acc+x,
acc -> acc/sum_weights
))""")) \
.drop("data", "idx", "sum_weights", "weights")
编辑:对于多列,您可以尝试:
cols = ['val1', 'val2', 'val3']
# function to set SQL expression to calculate weighted values for the field `val`
weighted_vals = lambda val: """
aggregate(
zip_with(data,weights, (x,y) -> x.{0}*y),
0D,
(acc,x) -> acc+x,
acc -> acc/sum_weights
) as weighted_{0}
""".format(val)
df_new = df.withColumn('data', sort_array(collect_list(struct('time',*cols)).over(w1))) \
.withColumn('idx', expr("array_position(data, (time,{}))-1".format(','.join(cols)))) \
.withColumn('weights', expr("transform(data, (x,i) -> 10 - abs(i-idx))")) \
.withColumn('sum_weights', expr("aggregate(weights, 0D, (acc,x) -> acc+x)")) \
.selectExpr(df.columns + [ weighted_vals(c) for c in cols ])
如果列数有限,我们可以编写 SQL 表达式来使用一个聚合函数计算加权值:df_new = df.withColumn('data', sort_array(collect_list(struct('time',*cols)).over(w1))) \
.withColumn('idx', expr("array_position(data, (time,{}))-1".format(','.join(cols)))) \
.withColumn('weights', expr("transform(data, (x,i) -> 10 - abs(i-idx))")) \
.withColumn('sum_weights', expr("aggregate(weights, 0D, (acc,x) -> acc+x)")) \
.withColumn("vals", expr("""
aggregate(
zip_with(data, weights, (x,y) -> (x.val1*y as val1, x.val2*y as val2)),
(0D as val1, 0D as val2),
(acc,x) -> (acc.val1 + x.val1, acc.val2 + x.val2),
acc -> (acc.val1/sum_weights as weighted_val1, acc.val2/sum_weights as weighted_val2)
)
""")).select(*df.columns, "vals.*")
关于python - 在 Pyspark 中有效计算加权滚动平均值,但有一些注意事项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63158118/