scala - 如何在 Spark-scala 中实现 LEAD 和 LAG

标签 scala apache-spark

我在 spark 数据框中有最终记录(连接和过滤后)。我需要比较连续行的(按键分区)列值,并根据需要更改 e_date 列值的条件,例如:

    sample table
    key1 key 2   col1   col2   s_date      e_date
     a     1      cv1     cv2   2014         2099 
     a     1      cv3     cv2   2016         2099 
     b     2      cv5     cv6   2016         2099
     b     2      cv5     cv6   2016         2099

   final table should look like 
    key1 key 2   col1   col2   s_date      e_date
     a     1      cv1     cv2   2014         2015  (next records s_date-1) 
     a     1      cv3     cv2   2016         2099 
     b     2      cv5     cv6   2016         2099
  • 上表有复合键,所以 key1 和 key2 是键
  • 按键在分区上比较 col1 和 col2 值
  • 如果任何列有新值结束旧记录,新记录的 s_date -1(最终表中的第 1 行,2 行)
  • 如果没有变化,则忽略新记录(最终表中的第 3 行)

  • scala-spark 中的任何指针

    最佳答案

    领先和滞后已经实现:

    import org.apache.spark.sql.functions.{lead, lag}
    import org.apache.spark.sql.expressions.Window
    
    lag('s_date, 1).over(Window.partitionBy('key1, 'key2).orderBy('s_date))
    

    查询 Introducing Window Functions in Spark SQL详情。

    关于scala - 如何在 Spark-scala 中实现 LEAD 和 LAG,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37754704/

    相关文章:

    java - Java 中的简单 mysql 选择比 MySQL Workbench 中慢

    scala - 如何调用仅存在于任一类型中的两种类型之一的方法?

    hadoop - 如何配置 pyspark 默认写入 HDFS?

    apache-spark - 结构化流写入多个流

    scala - 运行 TPCDS 基准数据集时出现 Spark 错误 - 找不到 dsdgen

    scala - 在另一个 SBT 插件中显式启用 SBT 插件

    javascript - Scala Play Framework 和 Angular JS - 在重复和混合概念方面付出了太多努力

    scala - 如何在生产中演化 akka-persistence 事件?

    scala - spark-csv 包中的 inferSchema

    scala - Spark中的FP增长模型