apache-spark - pyspark function.lag 条件

标签 apache-spark pyspark apache-spark-sql

我正在尝试使用 pyspark 解决问题,

我有一个数据集,例如:

Condition | Date
0 | 2019/01/10
1 | 2019/01/11
0 | 2019/01/15
1 | 2019/01/16
1 | 2019/01/19
0 | 2019/01/23
0 | 2019/01/25
1 | 2019/01/29
1 | 2019/01/30

我想获取满足条件== 1时日期列的最新滞后值

所需的输出类似于:

Condition | Date | Lag
0 | 2019/01/10 | NaN
1 | 2019/01/11 | NaN
0 | 2019/01/15 | 2019/01/11
1 | 2019/01/16 | 2019/01/11
1 | 2019/01/19 | 2019/01/16
0 | 2019/01/23 | 2019/01/19
0 | 2019/01/25 | 2019/01/19
1 | 2019/01/29 | 2019/01/19
1 | 2019/01/30 | 2019/01/29

我怎样才能做到这一点?

请记住它是一个非常大的数据集 - 我必须按 UUID 对其进行分区和分组,因此解决方案必须具有一定的性能。

谢谢,

最佳答案

这是 Pyspark 的解决方案。逻辑与@GordonLinoff 使用 SQL 查询的解决方案相同。

w = Window.orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
df.withColumn("Lag", max(when(col("Condition") == lit(1), col("Date"))).over(w)).show()

给予:

+---------+----------+----------+
|Condition|      Date|       Lag|
+---------+----------+----------+
|        0|2019/01/10|      null|
|        1|2019/01/11|      null|
|        0|2019/01/15|2019/01/11|
|        1|2019/01/16|2019/01/11|
|        1|2019/01/19|2019/01/16|
|        0|2019/01/23|2019/01/19|
|        0|2019/01/25|2019/01/19|
|        1|2019/01/29|2019/01/19|
|        1|2019/01/30|2019/01/29|
+---------+----------+----------+

关于apache-spark - pyspark function.lag 条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59502560/

相关文章:

python - 过滤器生成的 PySpark DataFrame - 它存储在哪里?

python - PySpark 将 IntegerTypes 转换为 ByteType 以进行优化

python - PySpark:调用 o51.showString 时发生错误。没有名为 XXX 的模块

postgresql - 从 postgreSQL 读取 100M 行到 Spark 并写入 parquet

apache-spark - Spark - 连接一对多关系数据框

apache-spark - VectorAssembler 失败并显示 java.util.NoSuchElementException : Param handleInvalid does not exist

java - 是否可以使用spark-avro数据源创建Spark应用程序并通过 'java -jar'执行它

docker - 对Docker镜像中的复制文件有误解

python - PySpark jdbc谓词错误: Py4JError: An error occurred while calling o108. jdbc

pyspark - 日期数组中的间隔数组