Apache Spark:3.3.2、Google Dataproc 2.1
scala> val orderDF = Seq(("1234","created","2023-09-17 00:00:00"),("1234","updated","2023-09-17 01:00:00")).toDF("orderID","status","eventTimestamp")
scala> orderDF.show(false)
+-------+-------+-------------------+
|orderID|status |eventTimestamp |
+-------+-------+-------------------+
|1234 |created|2023-09-17 00:00:00|
|1234 |updated|2023-09-17 01:00:00|
+-------+-------+-------------------+
scala> orderDF.select($"orderID",$"status",$"eventTimestamp",last($"status").over(Window.partitionBy($"orderID").orderBy($"eventTimestamp")).alias("latestStatus")).show(false)
+-------+-------+-------------------+------------+
|orderID|status |eventTimestamp |latestStatus|
+-------+-------+-------------------+------------+
|1234 |created|2023-09-17 00:00:00|created |
|1234 |updated|2023-09-17 01:00:00|updated |
+-------+-------+-------------------+------------+
scala> orderDF.select($"orderID",$"status",$"eventTimestamp",first($"status").over(Window.partitionBy($"orderID").orderBy($"eventTimestamp".desc)).alias("latestStatus")).show(false)
+-------+-------+-------------------+------------+
|orderID|status |eventTimestamp |latestStatus|
+-------+-------+-------------------+------------+
|1234 |updated|2023-09-17 01:00:00|updated |
|1234 |created|2023-09-17 00:00:00|updated |
+-------+-------+-------------------+------------+
在上面的示例中,最后一个函数不遵循提供的 Window orderBy。然而,第一个功能按预期工作。这是一个错误吗?
最佳答案
有两件事应该改变:
使用min
和max
代替first
和last
first和 last不返回给定顺序意义上的第一个和最后一个元素,而是返回 Spark“看到”的组的第一个和最后一个元素。文档说
The function is non-deterministic because its results depends on the order of the rows which may be non-deterministic after a shuffle.
使用 orderBy
时将 rowsBetween
添加到窗口规范
在 Windows 规范中使用 orderBy
时,最后一行的默认值为 current ( Source ),因此第一行的“组”不包含第二行。
代码应更改为
val windowAsc = Window.partitionBy($"orderID")
.orderBy($"eventTimestamp")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
orderDF.select($"orderID",$"status",$"eventTimestamp",max($"status")
.over(windowAsc).alias("latestStatus")).show(false)
val windowDesc = Window.partitionBy($"orderID")
.orderBy($"eventTimestamp".desc)
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
orderDF.select($"orderID",$"status",$"eventTimestamp",min($"status")
.over(windowDesc).alias("latestStatus")).show(false) }
产生预期的结果。
关于scala - 带有 Window orderBy 表达式的最后一个函数未按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/77120817/