java - Spark SQL : Window function lag until a condition met

标签 java apache-spark apache-spark-sql

我正在 Spark 中处理这个数据集:

+------------+------------+------------+
|     ColumnA|     ColumnB|     Result |
+------------+------------+------------+
|      ABCDEF|    MNOPQRST|      true  |
|      123455|      UVWXYZ|      false |
|      ABCDEF|    MNOPQRST|      false | (should be true)
|      123455|      UVWXYZ|      false |
|      123455|      UVWXYZ|      false |
|      ABCDEF|    EFGHIJK |      false |
+------------+------------+------------+

规则是:

  1. 如果给定分区的排名为 1,则将结果设置为 true。
  2. 如果排名不是 1 并且 ColumnA 值为 123455,则将 Result 值设置为 false
  3. 如果排名不是 1 并且 ColumnA 值不是 123455 并且 ColumnB 值与上一行的 匹配ColumnB 值,将 Result 设置为 true。确保上一行的 ColumnA 值不是 123455

    WindowSpec w = Window.partitionBy("ColumnA, ColumnB");

    列 matchColumnB =functions.col("ColumnB").equalTo( 函数.lag("ColumnB", 1).over(w));

此处窗口函数检查前一行,而不考虑前一行的 ColumnA 值。

例如,在上面的数据集中,第 3 行的 ColumnB 值应与第 1 行而不是第 2 行进行比较。

我尝试查看 Window.unboundedPreceding 但不确定如何在这种情况下使用它。

有办法实现吗?

最佳答案

复制 DF:

val df = sc.parallelize(List(("ABCDEF","MNOPQRST"), 
                    ("123455","UVWXYZ"),
                    ("ABCDEF","MNOPQRST"),
                    ("123455","UVWXYZ"),
                    ("123455","UVWXYZ"), 
                    ("ABCDEF","EFGHIJK")))
   .toDF("ColumnA","ColumnB")

所提供的信息存在一些矛盾,例如,您的窗口实现使得无法应用上述条件。

在基于行顺序进行工作时,窗口分析有一些要点[排名以及与前一行的比较]

  1. 您需要定义适当的分区列。如果窗口由 columnAcolumnB 分区,那么它们的值对于给定窗口将保持相同。因此,如果需要在leadlag行之间比较columnAcolumnB,那么DF需要由其他列分区。 说明为什么这是一个问题的示例

    val w = Window.partitionBy("ColumnA", "ColumnB").orderBy("ColumnA", "ColumnB");
    df.withColumn("rank", rank.over(w)).show
    +-------+--------+----+
    |ColumnA| ColumnB|rank|
    +-------+--------+----+
    | ABCDEF| EFGHIJK|   1|
    | ABCDEF|MNOPQRST|   1|
    | ABCDEF|MNOPQRST|   1|
    | 123455|  UVWXYZ|   1|
    | 123455|  UVWXYZ|   1|
    | 123455|  UVWXYZ|   1|
    +-------+--------+----+
    

    现在每一行都充当自己的窗口。请注意 order by,第 2 点对此进行了解释。

  2. 窗口中还需要具体的order by语句。如果没有这个排名,“滞后”、“领先”等将变得不确定,因此没有多大意义。 Spark 会尝试防范这种情况,如果没有 order by 子句,窗口函数将抛出异常。 说明为什么这是一个问题的示例

    val w = Window.partitionBy("ColumnA", "ColumnB")
    df.withColumn("result", lag("columnB", 1).over(w))
    

    导致:

    org.apache.spark.sql.AnalysisException: Window function lag('columnB, 1, null) requires window to be ordered, please add ORDER BY clause. For example SELECT lag('columnB, 1, null)(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
    

解决方案 回答这个问题本身:我将考虑另外两列来回答您的问题。

val df = sc.parallelize(List(("ABCDEF","MNOPQRST", "P1", "1"), 
                        ("123455","UVWXYZ", "P1", "2"),
                        ("ABCDEF","MNOPQRST", "P1", "3"),
                        ("123455","UVWXYZ", "P1", "4"),
                        ("123455","UVWXYZ", "P1", "5"), 
                        ("BLABLAH","UVWXYZ", "P1", "6"),
                        ("ABCDEF","EFGHIJK", "P1", "7")))
       .toDF("ColumnA","ColumnB", "ColumnP", "ColumnO")
+-------+--------+-------+-------+
|ColumnA| ColumnB|ColumnP|ColumnO|
+-------+--------+-------+-------+
| ABCDEF|MNOPQRST|     P1|      1|
| 123455|  UVWXYZ|     P1|      2|
| ABCDEF|MNOPQRST|     P1|      3|
| 123455|  UVWXYZ|     P1|      4|
| 123455|  UVWXYZ|     P1|      5|
|BLABLAH|  UVWXYZ|     P1|      5|
| ABCDEF| EFGHIJK|     P1|      6|
+-------+--------+-------+-------+

此处,分区列为 columnP,按列排序为 ColumnO

val w = Window.partitionBy("ColumnP").orderBy("ColumnO")
val dfWithWindowing = df.withColumn("lag_columnB", lag("columnB", 1).over(w))
                        .withColumn("rank", rank().over(w))
dfWithWindowing.show
+-------+--------+-------+-------+-----------+----+
|ColumnA| ColumnB|ColumnP|ColumnO|lag_columnB|rank|
+-------+--------+-------+-------+-----------+----+
| ABCDEF|MNOPQRST|     P1|      1|       null|   1|
| 123455|  UVWXYZ|     P1|      2|   MNOPQRST|   2|
| ABCDEF|MNOPQRST|     P1|      3|     UVWXYZ|   3|
| 123455|  UVWXYZ|     P1|      4|   MNOPQRST|   4|
| 123455|  UVWXYZ|     P1|      5|     UVWXYZ|   5|
|BLABLAH|  UVWXYZ|     P1|      6|     UVWXYZ|   6|
| ABCDEF| EFGHIJK|     P1|      7|     UVWXYZ|   7|
+-------+--------+-------+-------+-----------+----+

现在我们拥有了执行所需计算所需的所有信息。规则中没有规定当不满足任何条件时结果的值,实现认为这是真的。

val resultDF = dfWithWindowing.withColumn("result", when($"rank"==="1",true).otherwise(
                              when($"ColumnA"==="123455", false).otherwise(
                                    when($"ColumnB"===$"lag_columnB", true).otherwise(true)
                                 )
                              )
                          ).drop("ColumnP", "ColumnO","lag_columnB","rank")
+-------+--------+------+
|ColumnA| ColumnB|result|
+-------+--------+------+
| ABCDEF|MNOPQRST|  true|
| 123455|  UVWXYZ| false|
| ABCDEF|MNOPQRST|  true|
| 123455|  UVWXYZ| false|
| 123455|  UVWXYZ| false|
|BLABLAH|  UVWXYZ|  true|
| ABCDEF| EFGHIJK|  true|
+-------+--------+------+

了解更多关于窗口化的信息,请参阅https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

关于java - Spark SQL : Window function lag until a condition met,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55597715/

相关文章:

java - 向所有 Activity 全局添加/删除监听器

python - PySpark - 稀疏向量列到矩阵

hadoop - Hive的unbase64()函数的结果在Hive表中是正确的,但在输出文件中变为错误

scala - Spark SQL 未正确转换时区

python - 在 PySpark 中使用列对象代替字符串有什么优点

apache-spark - Spark 结构化流式处理 Kafka 微批处理计数

python - 如何在 PySpark 数据帧的第 0 轴上找到数组(数组列)的平均值?

java - log4j 在哪里/如何查找 log4j.properties 文件?

java - spring 如何同时为多个请求提供单例 bean

java - Android VideoView 不播放视频