python - 在 Pyspark 中有效计算加权滚动平均值,但有一些注意事项

标签 python pyspark apache-spark-sql

我正在尝试计算窗口上的滚动加权平均值 (partition by id1, id2 ORDER BY unixTime)在 Pyspark 中,想知道是否有人对如何做到这一点有想法。
滚动平均值将采用当前行的列值、该列的前 9 个行值和该列的 9 个后续行值,并根据每个值在该行中的方式加权。因此,当前行的权重为 10 倍,滞后 1/领先 1 值的权重为 9 倍。
如果没有一个值为空,那么加权平均值的分母将为 100。 一个警告是,如果有空值,我们仍然要计算移动平均值(除非有超过 1/2 的值是空值)。
因此,例如,如果当前 val 之前的 9 个值为空,则分母将为 55。如果超过 1/2 的值为空,那么我们将为加权平均值输出 NULL。我们也可以使用我们说如果分母小于 40 或其他什么的逻辑,输出 null。
我附上了一个屏幕截图来解释我在说什么,以防它令人困惑,希望这可以解决问题:
enter image description here
我知道我可以在 sql 中执行此操作(并且我可以将数据框保存为临时 View ),但是因为我必须为多列执行此滚动平均(完全相同的逻辑),理想情况下,如果我可以在 Pyspark 中执行此操作,我会能够编写一个 for 循环,然后为每一列执行此操作。另外,我很想有效地做到这一点。我已经阅读了许多关于滚动平均线的主题,但我认为这种情况略有不同。
对不起,如果我过于复杂,希望它是有道理的。 如果这不容易有效地做到,我知道如何通过在窗口上列出 lag(val, 10) ... lag(val, 9) over window ... 等来计算它,并且可以使用那。

最佳答案

IIUC,你可以尝试的一种方法是使用Window函数collect_list,对列表进行排序,找到位置idx当前行的使用 array_position ( 需要 Spark 2.4+ )然后基于此计算权重,让我们使用大小为 7 的示例窗口(或以下代码中的 N=3):

from pyspark.sql.functions import expr, sort_array, collect_list, struct
from pyspark.sql import Window

df = spark.createDataFrame([
    (0, 0.5), (1, 0.6), (2, 0.65), (3, 0.7), (4, 0.77),
    (5, 0.8), (6, 0.7), (7, 0.9), (8, 0.99), (9, 0.95)
], ["time", "val"])

N = 3

w1 = Window.partitionBy().orderBy('time').rowsBetween(-N,N)

# note that the index for array_position is 1-based, `i` in transform function is 0-based
df1 = df.withColumn('data', sort_array(collect_list(struct('time','val')).over(w1))) \
    .withColumn('idx', expr("array_position(data, (time,val))-1")) \
    .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))"))

df1.show(truncate=False)
+----+----+-------------------------------------------------------------------------+---+----------------------+
|time|val |data                                                                     |idx|weights               |
+----+----+-------------------------------------------------------------------------+---+----------------------+
|0   |0.5 |[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7]]                                |0  |[10, 9, 8, 7]         |
|1   |0.6 |[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77]]                     |1  |[9, 10, 9, 8, 7]      |
|2   |0.65|[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8]]           |2  |[8, 9, 10, 9, 8, 7]   |
|3   |0.7 |[[0, 0.5], [1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7]] |3  |[7, 8, 9, 10, 9, 8, 7]|
|4   |0.77|[[1, 0.6], [2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9]] |3  |[7, 8, 9, 10, 9, 8, 7]|
|5   |0.8 |[[2, 0.65], [3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99]]|3  |[7, 8, 9, 10, 9, 8, 7]|
|6   |0.7 |[[3, 0.7], [4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]|3  |[7, 8, 9, 10, 9, 8, 7]|
|7   |0.9 |[[4, 0.77], [5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]          |3  |[7, 8, 9, 10, 9, 8]   |
|8   |0.99|[[5, 0.8], [6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]                     |3  |[7, 8, 9, 10, 9]      |
|9   |0.95|[[6, 0.7], [7, 0.9], [8, 0.99], [9, 0.95]]                               |3  |[7, 8, 9, 10]         |
+----+----+-------------------------------------------------------------------------+---+----------------------+
然后我们可以使用 SparkSQL 内置函数 aggregate计算权重和加权值的总和:
N = 9

w1 = Window.partitionBy().orderBy('time').rowsBetween(-N,N)

df_new = df.withColumn('data', sort_array(collect_list(struct('time','val')).over(w1))) \
    .withColumn('idx', expr("array_position(data, (time,val))-1")) \
    .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))"))\
    .withColumn('sum_weights', expr("aggregate(weights, 0D, (acc,x) -> acc+x)")) \
    .withColumn('weighted_val', expr("""
      aggregate(
        zip_with(data,weights, (x,y) -> x.val*y),
        0D, 
        (acc,x) -> acc+x,
        acc -> acc/sum_weights
      )""")) \
    .drop("data", "idx", "sum_weights", "weights")

df_new.show()
+----+----+------------------+
|time| val|      weighted_val|
+----+----+------------------+
|   0| 0.5|0.6827272727272726|
|   1| 0.6|0.7001587301587302|
|   2|0.65|0.7169565217391304|
|   3| 0.7|0.7332876712328767|
|   4|0.77|            0.7492|
|   5| 0.8|0.7641333333333333|
|   6| 0.7|0.7784931506849315|
|   7| 0.9|0.7963768115942028|
|   8|0.99|0.8138095238095238|
|   9|0.95|0.8292727272727273|
+----+----+------------------+
备注:
  • 您可以通过设置 struct('time','val1', 'val2') 来计算多列在计算df_new的第一行然后调整idx的对应计算和 x.val*yweighted_val等等。
  • 要在无法收集不到一半的值时设置 NULL,请添加 IF(size(data) <= 9, NULL, ...)IF(sum_weights < 40, NULL, ...)声明如下:
      df_new = df.withColumn(...) \
      ...
          .withColumn('weighted_val', expr(""" IF(size(data) <= 9, NULL, 
            aggregate( 
              zip_with(data,weights, (x,y) -> x.val*y), 
              0D,  
              (acc,x) -> acc+x, 
              acc -> acc/sum_weights 
           ))""")) \
          .drop("data", "idx", "sum_weights", "weights")
    

  • 编辑:对于多列,您可以尝试:
    cols = ['val1', 'val2', 'val3']
    
    # function to set SQL expression to calculate weighted values for the field `val`
    weighted_vals = lambda val: """
        aggregate(
          zip_with(data,weights, (x,y) -> x.{0}*y),
          0D,
          (acc,x) -> acc+x,
          acc -> acc/sum_weights
        ) as weighted_{0}
    """.format(val)
    
    df_new = df.withColumn('data', sort_array(collect_list(struct('time',*cols)).over(w1))) \
      .withColumn('idx', expr("array_position(data, (time,{}))-1".format(','.join(cols)))) \
      .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))")) \
      .withColumn('sum_weights', expr("aggregate(weights, 0D, (acc,x) -> acc+x)")) \
      .selectExpr(df.columns + [ weighted_vals(c) for c in cols ])
    
    如果列数有限,我们可以编写 SQL 表达式来使用一个聚合函数计算加权值:
    df_new = df.withColumn('data', sort_array(collect_list(struct('time',*cols)).over(w1))) \
      .withColumn('idx', expr("array_position(data, (time,{}))-1".format(','.join(cols)))) \
      .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))")) \
      .withColumn('sum_weights', expr("aggregate(weights, 0D, (acc,x) -> acc+x)")) \
      .withColumn("vals", expr(""" 
       aggregate( 
         zip_with(data, weights, (x,y) -> (x.val1*y as val1, x.val2*y as val2)),
         (0D as val1, 0D as val2), 
         (acc,x) -> (acc.val1 + x.val1, acc.val2 + x.val2),
         acc -> (acc.val1/sum_weights as weighted_val1, acc.val2/sum_weights as weighted_val2)
       )     
       """)).select(*df.columns, "vals.*")
    

    关于python - 在 Pyspark 中有效计算加权滚动平均值,但有一些注意事项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63158118/

    相关文章:

    apache-spark - Spark on Databricks - 缓存 Hive 表

    python - 如何创建一个监听文件描述符的 Python 套接字服务器?

    python - 使用python从包含表格网格的图像中提取数据

    apache-spark - 为什么在 Spark 中需要折叠 Action ?

    python - Pyspark:通过搜索字典替换列中的值

    apache-spark - 如何使用变换高阶函数?

    python - 如何从包含单行所有关系的csv文件中获取关系数据?

    python - Pandas 中的轴是什么意思?

    apache-spark - 无法将考拉系列指定为考拉中的新列

    apache-spark - 在 Google Colab 中使用图形框架