python - 使用 PySpark 对多列执行滞后

标签 python apache-spark pyspark apache-spark-sql lag

我对 PySpark 相当陌生,但我正在尝试在我的代码中使用最佳实践。我有一个 PySpark 数据框,我想滞后多个列,用滞后值替换原始值。示例:

ID     date        value1     value2     value3
1      2021-12-23  1.1        4.0        2.2
2      2021-12-21  2.4        1.6        11.9
1      2021-12-24  5.4        3.2        7.8
2      2021-12-22  4.2        1.4        9.0
1      2021-12-26  2.3        5.2        7.6
.
.
.

我想根据ID获取所有值,按日期排序,然后将这些值滞后一定量。到目前为止我拥有的代码:

from pyspark.sql import functions as F, Window

window = Window.partitionBy(F.col("ID")).orderBy(F.col("date"))

valueColumns = ['value1', 'value2', 'value3']

df = F.lag(valueColumns, offset=shiftAmount).over(window)

我想要的输出是:

ID     date        value1     value2     value3
1      2021-12-23  Null       Null       Null
2      2021-12-21  Null       Null       Null
1      2021-12-24  1.1        4.0        2.2
2      2021-12-22  2.4        1.6        11.9
1      2021-12-26  5.4        3.2        7.86
.
.
.

我遇到的问题是,据我所知,F.lag 仅接受单个列。我正在寻找有关如何最好地实现这一目标的建议。我想我可以使用 for 循环来附加移位的列或其他内容,但这看起来相当不优雅。谢谢!

最佳答案

对列名称的简单列表理解应该可以完成这项工作:

df = df.select(
    "ID", "date",
    *[F.lag(c, offset=shiftAmount).over(window).alias(c) for c in valueColumns]
)

关于python - 使用 PySpark 对多列执行滞后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70582516/

相关文章:

python - 在 Python 中从多个列表组成一个集合的首选方法是什么

python - 让 RandomForestClassifier 在训练期间确定选择一个变量

java - 如何使用 Spark 和 JavaRDD 检索特定行?

python - Apache Spark 在 reduceByKey 步骤上速度缓慢

apache-spark - 如何将每个 DStream 保存/插入到永久表中

python - 使用 MLlib 时出现 NumPy 异常,即使安装了 Numpy

使用 select() 的 Python TCP 服务器

python - 使用 Route53 通过 boto 更新 DNS。我应该使用名称还是 CNAME 以及要使用的 TTL

scala - 使用 FlatMap 使用 Spark 和 Scala 将列名称附加到元素

java - 如何计算 Spark 的 Spearman 相关系数?我无法复制统计书中的样本