pyspark - 从 pyspark 中的滞后查找下一个不同的值

标签 pyspark

我有一个像这样的 pyspark 数据框,

+-----+----------+
|value|val_joined|
+-----+----------+
|    3|         3|
|    4|       3+4|
|    5|     3+4+5|
|    5|     3+4+5|
|    5|     3+4+5|
|    2|   3+4+5+2|
+-----+----------+

由此,我必须创建另一个像这样的列,

+-----+----------+------+
|value|val_joined|result|
+-----+----------+------+
|    3|         3|   4.0|
|    4|       3+4|   5.0|
|    5|     3+4+5|   2.0|
|    5|     3+4+5|   2.0|
|    5|     3+4+5|   2.0|
|    2|   3+4+5+2|   NaN|
+-----+----------+------+

结果列应如下所示,对于名为 value 的列中的项目,找到按顺序排列的下一个项目。因此,对于值 3,它将是 4,对于值 4,它将是 5。

但是当存在像值 5 这样重复 3 次的重复项时,简单的滞后将不起作用。由于前 5 个的滞后将导致 5。我基本上想重复获取滞后,直到值 != lag(value) 或 lag(value) 为 null。

如何在没有 udf 和 join 的 pyspark 中执行此操作?

最佳答案

我们可以使用 2 个窗口,并通过在另一个窗口中分配 monotonically_increasing_idlast 值,在第一个窗口中查找下一行值,如下所示:

import pyspark.sql.functions as F
w = Window.orderBy('idx')
w1 = Window.partitionBy('value')

(df.withColumn('idx',F.monotonically_increasing_id())
.withColumn("result",F.last(F.lead("value").over(w)).over(w1)).orderBy('idx')
.drop('idx')).show()

+-----+----------+------+
|value|val_joined|result|
+-----+----------+------+
|    3|         3|     4|
|    4|       3+4|     5|
|    5|     3+4+5|     2|
|    5|     3+4+5|     2|
|    5|     3+4+5|     2|
|    2|   3+4+5+2|  null|
+-----+----------+------+

如果 value 中的数字可以重复下面的示例:

+-----+----------+
|value|val_joined|
+-----+----------+
|3    |3         |
|4    |3+4       |
|5    |3+4+5     |
|5    |3+4+5     |
|5    |3+4+5     |
|2    |3+4+5+2   |
|5    |3+4+5+2+5 | <- this value is repeated later
+-----+----------+

然后我们必须创建一个单独的组并将该组作为窗口:

w = Window.orderBy('idx')
w1 = Window.partitionBy('group')

(df.withColumn('idx',F.monotonically_increasing_id())
  .withColumn("lag", F.when(F.lag("value").over(w)!=F.col("value"), F.lit(1))
  .otherwise(F.lit(0)))
  .withColumn("group", F.sum("lag").over(w) + 1).drop("lag")
  .withColumn("result",F.last(F.lead("value").over(w)).over(w1)).orderBy('idx')
  .drop('idx',"group")).show()

+-----+----------+------+
|value|val_joined|result|
+-----+----------+------+
|    3|         3|     4|
|    4|       3+4|     5|
|    5|     3+4+5|     2|
|    5|     3+4+5|     2|
|    5|     3+4+5|     2|
|    2|   3+4+5+2|     5|
|    5| 3+4+5+2+5|  null|
+-----+----------+------+

关于pyspark - 从 pyspark 中的滞后查找下一个不同的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61337207/

相关文章:

hadoop - 如何将序列文件转换为拼花格式

ubuntu - 从 Pyspark 访问 HDFS 失败

apache-spark - 如何在Linux环境下按小时计划pyspark脚本

python - Spark运行时错误: uninitialized classmethod object

python-2.7 - 将重复记录合并为 pyspark 数据框中的单个记录

python - 连接条件期间的Pyspark内存问题

pyspark - Databricks/pyspark : How to get all full directory paths (that have at least one file as content) from Azure Blob storage recursively

java - Spark Master在Worker节点应用程序/作业提交后出现IOException后无休止地重新提交

hadoop - Pyspark:获取 HDFS 路径上的文件/目录列表

python - PySpark 如何找到合适数量的集群