scala - 如何对数组列的元素进行切片和求和?

标签 scala apache-spark apache-spark-sql

我要sum (或也执行其他聚合函数)使用 SparkSQL 在数组列上。

我有一张 table

+-------+-------+---------------------------------+
|dept_id|dept_nm|                      emp_details|
+-------+-------+---------------------------------+
|     10|Finance|        [100, 200, 300, 400, 500]|
|     20|     IT|                [10, 20, 50, 100]|
+-------+-------+---------------------------------+

我想总结这个 emp_details 的值柱子 。

预期查询:
sqlContext.sql("select sum(emp_details) from mytable").show

预期结果
1500
180

此外,我也应该能够对范围元素进行总结,例如:
sqlContext.sql("select sum(slice(emp_details,0,3)) from mytable").show

结果
600
80

当按预期对 Array 类型执行 sum 时,它表明 sum 期望参数是数字类型而不是数组类型。

我认为我们需要为此创建 UDF。但是怎么样?

使用 UDF 时我会遇到任何性能问题吗?
除了UDF之外还有其他解决方案吗?

最佳答案

Spark 2.4.0

截至 Spark 2.4 , Spark SQL 支持 高阶函数 用于操作复杂的数据结构,包括数组。

“现代”解决方案如下:

scala> input.show(false)
+-------+-------+-------------------------+
|dept_id|dept_nm|emp_details              |
+-------+-------+-------------------------+
|10     |Finance|[100, 200, 300, 400, 500]|
|20     |IT     |[10, 20, 50, 100]        |
+-------+-------+-------------------------+

input.createOrReplaceTempView("mytable")

val sqlText = "select dept_id, dept_nm, aggregate(emp_details, 0, (acc, value) -> acc + value) as sum from mytable"
scala> sql(sqlText).show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
|     10|Finance|1500|
|     20|     IT| 180|
+-------+-------+----+

您可以在以下文章和视频中找到有关高阶函数的好读物:
  • Introducing New Built-in and Higher-Order Functions for Complex Data Types in Apache Spark 2.4
  • Working with Nested Data Using Higher Order Functions in SQL on Databricks
  • An Introduction to Higher Order Functions in Spark SQL with Herman van Hovell (Databricks)

  • Spark 2.3.2 及更早版本

    免责声明 我不推荐这种方法(即使它获得了最多的赞成票),因为 Spark SQL 执行的反序列化 Dataset.map .该查询强制 Spark 反序列化数据并将其加载到 JVM(从 Spark 在 JVM 外部管理的内存区域)。这将不可避免地导致更频繁的 GC,从而使性能变差。

    一种解决方案是使用 Dataset Spark SQL 和 Scala 的组合可以显示其强大功能的解决方案。
    scala> val inventory = Seq(
         |   (10, "Finance", Seq(100, 200, 300, 400, 500)),
         |   (20, "IT", Seq(10, 20, 50, 100))).toDF("dept_id", "dept_nm", "emp_details")
    inventory: org.apache.spark.sql.DataFrame = [dept_id: int, dept_nm: string ... 1 more field]
    
    // I'm too lazy today for a case class
    scala> inventory.as[(Long, String, Seq[Int])].
      map { case (deptId, deptName, details) => (deptId, deptName, details.sum) }.
      toDF("dept_id", "dept_nm", "sum").
      show
    +-------+-------+----+
    |dept_id|dept_nm| sum|
    +-------+-------+----+
    |     10|Finance|1500|
    |     20|     IT| 180|
    +-------+-------+----+
    

    我将切片部分留作练习,因为它同样简单。

    关于scala - 如何对数组列的元素进行切片和求和?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40151064/

    相关文章:

    scala - Scala 中如何确定类型参数的子类型?

    algorithm - 如何通过在scala中映射来制作字符串序列?

    apache-spark - 如何将转换后的数据从分区发送到 S3?

    algorithm - apache spark 上的不相交集

    python - 有没有办法在不破坏函数链的情况下在 PySpark 中执行强制转换或 withColumn 数据帧操作?

    hadoop - 尝试在Apache Spark中持久存储到数据库时RDD不起作用

    scala - 将Seq或List转换为collection.immutable.Queue

    scala - Spark,在 DataFrame(或 RDD)上多次应用过滤器,无需冗余评估

    r - 使用sparklyr::spark_read_json时添加文件名

    java - 如何基于列合并两个数据帧spark java/scala?