我正在尝试在下面的DataFrame
中比较当前行和上一行的记录。我要计算“金额”列。
scala> val dataset = sc.parallelize(Seq((1, 123, 50), (2, 456, 30), (3, 456, 70), (4, 789, 80))).toDF("SL_NO","ID","AMOUNT")
scala> dataset.show
+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
| 1|123| 50|
| 2|456| 30|
| 3|456| 70|
| 4|789| 80|
+-----+---+------+
计算逻辑:
SL_NO-2(即30)的金额。否则为SL_NO的AMOUNT-1(即-50)
SL_NO的金额-3(即70)。否则为SL_NO-2(即-30)
其他行也需要遵循相同的逻辑。
预期输出:
+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
| 1|123| 50|
| 2|456| 30|
| 3|456| 30|
| 4|789| 80|
+-----+---+------+
请帮忙。
最佳答案
您可以将 lag
与when.otherwise
一起使用,这是一个演示:
import org.apache.spark.sql.expressions.Window
val w = Window.orderBy($"SL_NO")
dataset.withColumn("AMOUNT",
when($"ID" === lag($"ID", 1).over(w), lag($"AMOUNT", 1).over(w)).otherwise($"AMOUNT")
).show
+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
| 1|123| 50|
| 2|456| 30|
| 3|456| 30|
| 4|789| 80|
+-----+---+------+
注意:因为此示例未使用任何分区,所以它可能存在性能问题,在您的实际数据中,如果您的问题可以由某些变量进行分区,则可能会有所帮助,取决于您的实际问题和ID是否为
Window.orderBy($"SL_NO").partitionBy($"ID")
一起排序。
关于scala - 比较Spark中当前行和上一行的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46197571/