scala - 带有 Window orderBy 表达式的最后一个函数未按预期工作

标签 scala apache-spark

Apache Spark:3.3.2、Google Dataproc 2.1

scala> val orderDF = Seq(("1234","created","2023-09-17 00:00:00"),("1234","updated","2023-09-17 01:00:00")).toDF("orderID","status","eventTimestamp")

scala> orderDF.show(false)
+-------+-------+-------------------+
|orderID|status |eventTimestamp     |
+-------+-------+-------------------+
|1234   |created|2023-09-17 00:00:00|
|1234   |updated|2023-09-17 01:00:00|
+-------+-------+-------------------+

scala> orderDF.select($"orderID",$"status",$"eventTimestamp",last($"status").over(Window.partitionBy($"orderID").orderBy($"eventTimestamp")).alias("latestStatus")).show(false)
+-------+-------+-------------------+------------+
|orderID|status |eventTimestamp     |latestStatus|
+-------+-------+-------------------+------------+
|1234   |created|2023-09-17 00:00:00|created     |
|1234   |updated|2023-09-17 01:00:00|updated     |
+-------+-------+-------------------+------------+

scala> orderDF.select($"orderID",$"status",$"eventTimestamp",first($"status").over(Window.partitionBy($"orderID").orderBy($"eventTimestamp".desc)).alias("latestStatus")).show(false)
+-------+-------+-------------------+------------+
|orderID|status |eventTimestamp     |latestStatus|
+-------+-------+-------------------+------------+
|1234   |updated|2023-09-17 01:00:00|updated     |
|1234   |created|2023-09-17 00:00:00|updated     |
+-------+-------+-------------------+------------+

在上面的示例中,最后一个函数不遵循提供的 Window orderBy。然而,第一个功能按预期工作。这是一个错误吗?

最佳答案

有两件事应该改变:

使用minmax代替firstlast

firstlast不返回给定顺序意义上的第一个和最后一个元素,而是返回 Spark“看到”的组的第一个和最后一个元素。文档说

The function is non-deterministic because its results depends on the order of the rows which may be non-deterministic after a shuffle.

而不是第一个最后一个minmax应该是首选。

使用 orderBy 时将 rowsBetween 添加到窗口规范

在 Windows 规范中使用 orderBy 时,最后一行的默认值为 current ( Source ),因此第一行的“组”不包含第二行。

代码应更改为

val windowAsc = Window.partitionBy($"orderID")
                      .orderBy($"eventTimestamp")
                      .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
orderDF.select($"orderID",$"status",$"eventTimestamp",max($"status")
       .over(windowAsc).alias("latestStatus")).show(false)

val windowDesc = Window.partitionBy($"orderID")
                       .orderBy($"eventTimestamp".desc)
                       .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
orderDF.select($"orderID",$"status",$"eventTimestamp",min($"status")
       .over(windowDesc).alias("latestStatus")).show(false)  }

产生预期的结果。

关于scala - 带有 Window orderBy 表达式的最后一个函数未按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/77120817/

相关文章:

java - 在 Apache Spark 中使用 StructType 创建 JSON 架构

scala - Scala 脚本可以引用同一目录中其他未编译的 scala 代码吗?

python - 将 Python 转换为 Scala

hadoop - pyspark : how to check if a file exists in hdfs

scala - 错误 AzureNativeFileSystemStore : DirectoryIsNotEmpty

scala - 如何在 foreachPartition 中使用 SQLContext 和 SparkContext

python - pyspark 上使用 Spark 的代码

Scala:具有复杂结构的树插入尾递归

scala - 找不到参数编码器 : io. circe.Encoder[com.sweetsoft.SapHealth] 的隐含值

scala 元组拆包