apache-spark - 订购 Pyspark 窗口时缺少数据

标签 apache-spark pyspark apache-spark-sql

这是我当前的数据集:

from pyspark.sql import Window
import pyspark.sql.functions as psf

df = spark.createDataFrame([("2","1",1),
                            ("3","2",2)],
                     schema = StructType([StructField("Data",  StringType()),
                                          StructField("Source",StringType()),
                                          StructField("Date",  IntegerType())]))


display(df.withColumn("Result",psf.collect_set("Data").over(Window.partitionBy("Source").orderBy("Date"))))

输出:

<表类="s-表"> <头> 数据 来源 <日>日期 结果 <正文> 2 1 1 ["2"] 3 1 2 ["2","3"]

为什么在 ordered 的 Window 上使用 collect_set 函数时,Result 列的第一行中缺少值 3

我也尝试过使用 collect_list,但得到的结果相同。

我想要的输出是:

<表类="s-表"> <头> 数据 来源 <日>日期 结果 <正文> 2 1 1 ["2","3"] 3 1 2 ["2","3"]

保留 Result 中值的顺序 - 第一个是 Date = 1,第二个是 Date = 2

最佳答案

您需要使用带有 unboundedPrecedingWindow.unboundedFollowing 的窗口:

Window.partitionBy("Source").orderBy("Date") \
  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

当您有 orderBy

时,默认情况下 Spark 使用 rowsBetween(Window.unboundedPreceding, Window.currentRow)

关于apache-spark - 订购 Pyspark 窗口时缺少数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70591443/

相关文章:

apache-spark - 在另一个数据框的UDF中时如何引用数据框?

python - Pyspark - 不确定如何将以下 X 行的总和分配给现有行值

pyspark - 如何使用 DataFrame 在 Spark 中构建坐标矩阵?

apache-spark - 通过 Airflow 调度在 Kubernetes 上运行的 Spark 作业

hadoop - 有没有办法让 Spark 在不使用 Hadoop 的情况下读取 AWS S3 文件?

python - Pyspark DataFrame - 如何使用变量进行连接?

java - org.apache.spark.sql.AnalysisException : No such struct field

apache-spark - 如果我显式传递模式,是否需要在带有 parquet 的 Spark 中使用 "mergeSchema"选项?

apache-spark - 什么是 StringIndexer 、 VectorIndexer 以及如何使用它们?

regex - Spark-SQL 是否支持使用正则表达式规范的 Hive Select All Query with except Columns