scala - 比较Spark中当前行和上一行的值

标签 scala apache-spark apache-spark-sql

我正在尝试在下面的DataFrame中比较当前行和上一行的记录。我要计算“金额”列。

scala> val dataset = sc.parallelize(Seq((1, 123, 50), (2, 456, 30), (3, 456, 70), (4, 789, 80))).toDF("SL_NO","ID","AMOUNT")

scala> dataset.show
+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
|    1|123|    50|
|    2|456|    30|
|    3|456|    70|
|    4|789|    80|
+-----+---+------+

计算逻辑:
  • 对于第1行,AMOUNT应该从第一行开始为50。
  • 对于第2行,如果SL_NO-2和1的ID不相同,则需要考虑
    SL_NO-2(即30)的金额。否则为SL_NO的AMOUNT-1(即-50)
  • 对于第3行,如果SL_NO-3和2的ID不相同,则需要考虑
    SL_NO的金额-3(即70)。否则为SL_NO-2(即-30)
  • AMOUNT

    其他行也需要遵循相同的逻辑。

    预期输出:
    +-----+---+------+
    |SL_NO| ID|AMOUNT|
    +-----+---+------+
    |    1|123|    50|
    |    2|456|    30|
    |    3|456|    30|
    |    4|789|    80|
    +-----+---+------+
    

    请帮忙。

    最佳答案

    您可以将 lag when.otherwise一起使用,这是一个演示:

    import org.apache.spark.sql.expressions.Window
    
    val w = Window.orderBy($"SL_NO")
    dataset.withColumn("AMOUNT", 
        when($"ID" === lag($"ID", 1).over(w), lag($"AMOUNT", 1).over(w)).otherwise($"AMOUNT")
    ).show
    
    +-----+---+------+
    |SL_NO| ID|AMOUNT|
    +-----+---+------+
    |    1|123|    50|
    |    2|456|    30|
    |    3|456|    30|
    |    4|789|    80|
    +-----+---+------+
    

    注意:因为此示例未使用任何分区,所以它可能存在性能问题,在您的实际数据中,如果您的问题可以由某些变量进行分区,则可能会有所帮助,取决于您的实际问题和ID是否为Window.orderBy($"SL_NO").partitionBy($"ID")一起排序。

    关于scala - 比较Spark中当前行和上一行的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46197571/

    相关文章:

    scala - 如何在 Windows 中启用 SBT 远程调试?

    scala - Apache Spark 缓存如何处理具有非线性 DAG 的未缓存文件源?

    java - 如何获取变量中的数据帧值

    scala - Spark Scala : How to convert Dataframe[vector] to DataFrame[f1:Double, ...,fn:双)]

    apache-spark - 如何知道我的数据有偏差?

    scala - 为什么 Typesafe 激活器命令 `activator dependencies` 不起作用?

    scala - 为什么 Scala 的不可变 Set 在其类型上不是协变的?

    apache-spark - 如何从 Cassandra 表加载元组?

    json - 使用 Spark 2 从 json 解析纪元毫秒

    apache-spark - 在Spark中读取ORC文件时如何保留分区列