sql - Spark Scala : Getting Cumulative Sum (Running Total) Using Analytical Functions

标签 sql scala apache-spark apache-spark-sql window-functions

我正在使用窗口函数在 Spark 中实现累积总和。
但是在应用窗口分区函数时不保持记录输入的顺序

输入数据:

val base = List(List("10", "MILLER", "1300", "2017-11-03"), List("10", "Clark", "2450", "2017-12-9"), List("10", "King", "5000", "2018-01-28"),
  List("30", "James", "950", "2017-10-18"), List("30", "Martin", "1250", "2017-11-21"), List("30", "Ward", "1250", "2018-02-05"))
  .map(row => (row(0), row(1), row(2), row(3)))

val DS1 = base.toDF("dept_no", "emp_name", "sal", "date")
DS1.show()

+-------+--------+----+----------+
|dept_no|emp_name| sal|      date|
+-------+--------+----+----------+
|     10|  MILLER|1300|2017-11-03|
|     10|   Clark|2450| 2017-12-9|
|     10|    King|5000|2018-01-28|
|     30|   James| 950|2017-10-18|
|     30|  Martin|1250|2017-11-21|
|     30|    Ward|1250|2018-02-05|
+-------+--------+----+----------+

预期输出:

+-------+--------+----+----------+-----------+
|dept_no|emp_name| sal|      date|Dept_CumSal|
+-------+--------+----+----------+-----------+
|     10|  MILLER|1300|2017-11-03|     1300.0|
|     10|   Clark|2450| 2017-12-9|     3750.0|
|     10|    King|5000|2018-01-28|     8750.0|
|     30|   James| 950|2017-10-18|      950.0|
|     30|  Martin|1250|2017-11-21|     2200.0|
|     30|    Ward|1250|2018-02-05|     3450.0|
+-------+--------+----+----------+-----------+

我试过下面的逻辑
val baseDepCumSal = DS1.withColumn("Dept_CumSal", sum("sal").over(Window.partitionBy("dept_no").
  orderBy(col("sal"), col("emp_name"), col("date").asc).
  rowsBetween(Long.MinValue, 0)
))

baseDepCumSal.orderBy("dept_no", "date").show

+-------+--------+----+----------+-----------+
|dept_no|emp_name| sal|      date|Dept_CumSal|
+-------+--------+----+----------+-----------+
|     10|  MILLER|1300|2017-11-03|     1300.0|
|     10|   Clark|2450| 2017-12-9|     3750.0|
|     10|    King|5000|2018-01-28|     8750.0|
|     30|   James| 950|2017-10-18|     3450.0|
|     30|  Martin|1250|2017-11-21|     1250.0|
|     30|    Ward|1250|2018-02-05|     2500.0|
+-------+--------+----+----------+-----------+

对于dept_no=10,记录按预期顺序计算,而对于dept_no=30,记录不是按输入顺序计算的。

最佳答案

这是因为类型不正确。因为薪水是string

DS1.printSchema
root
 |-- dept_no: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- sal: string (nullable = true)
 |-- date: string (nullable = true)

它按字典顺序排序:
DS1.orderBy("sal").show
+-------+--------+----+----------+
|dept_no|emp_name| sal|      date|
+-------+--------+----+----------+
|     30|  Martin|1250|2017-11-21|
|     30|    Ward|1250|2018-02-05|
|     10|  MILLER|1300|2017-11-03|
|     10|   Clark|2450| 2017-12-9|
|     10|    King|5000|2018-01-28|
|     30|   James| 950|2017-10-18|
+-------+--------+----+----------+ 

要获得所需的结果,您必须进行转换(并且不需要框架定义):
DS1.withColumn("Dept_CumSal", sum("sal").over(
  Window
     .partitionBy("dept_no")
    .orderBy(col("sal").cast("integer"), col("emp_name"), col("date").asc))).show
+-------+--------+----+----------+-----------+                                  
|dept_no|emp_name| sal|      date|Dept_CumSal|
+-------+--------+----+----------+-----------+
|     30|   James| 950|2017-10-18|      950.0|
|     30|  Martin|1250|2017-11-21|     2200.0|
|     30|    Ward|1250|2018-02-05|     3450.0|
|     10|  MILLER|1300|2017-11-03|     1300.0|
|     10|   Clark|2450| 2017-12-9|     3750.0|
|     10|    King|5000|2018-01-28|     8750.0|
+-------+--------+----+----------+-----------+

关于sql - Spark Scala : Getting Cumulative Sum (Running Total) Using Analytical Functions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48607253/

相关文章:

java - 如何简单地将 spark jar 部署到远程 hadoop 集群?

sql - 有没有办法阻止 SQL Express 2008 空闲?

mysql - 如何找到所有 sibling 和同父异母 sibling

c# - Entity Framework 4.1 - 代码优先 : many-to-many relationship

scala - Scala 中 Await.result 和 futures.onComplete 的区别

python - 超时错误: Error with 400 StatusCode: "requirement failed: Session isn' t active.”

sql - 无法通过 SQL 查询替换 Char(63)

java - Spark 3.1.2 NoSuchMethodError : org. apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.toAggregateExpression$default$2()Lscala/Option;

scala - Scala 中的两个 Seq 比较

python - spark 从 oracle 导入数据 - java.lang.ClassNotFoundException : oracle. jdbc.driver.OracleDriver