scala - Spark UDAF动态输入架构处理

标签 scala apache-spark hadoop apache-spark-sql user-defined-functions

我知道如何将带有内部结构的结构传递给UDAF-
Pass a struct to an UDAF in spark

但是,如何处理内部结构模式未知或动态的情况(就其基于数据的变化而言)。由于输入数据未遵循特定的架构,因此某些字段可能存在也可能不存在。假设一个数据集有

   root
     |-- id:string (nullable = false)
     |-- age: long (nullable = true)
     |-- cars: struct (nullable = true)
     |    |-- car1: string (nullable = true)
     |    |-- car2: string (nullable = true)
     |    |-- car3: string (nullable = true)
     |-- name: string (nullable = true)

而且其他数据集没有car3
root
 |-- id:string (nullable = false)
 |-- age: long (nullable = true)
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |-- name: string (nullable = true)

如何编写一个UDAF来接受基于输入数据而更改的架构。

最佳答案

初始化Udaf类时可以动态传递模式-

    val yetAnotherUdaf = new YetAnotherUdaf(schema)

    case class YetAnotherUdaf(schema:StructType) extends UserDefinedAggregateFunction {

      override def deterministic:Boolean=true
      override def dataType:DataType=schema
      override def inputSchema:StructType=schema
      override def bufferSchema:StructType=schema

      override def initialize(buffer:MutableAggregationBuffer):Unit={ ??? }
      override def update(buffer:MutableAggregationBuffer, input:Row):Unit={ ??? }
      override def merge(buffer1:MutableAggregationBuffer, buffer2:Row):Unit={???}
      override def evaluate(buffer:Row):StructType={ ??? }
   }

关于scala - Spark UDAF动态输入架构处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54548599/

相关文章:

作为函数结果的具有多种类型的 Scala 类型参数

java - 重载方法值 ofDim 与 scala 中的替代方案

function - println _ 在斯卡拉 : Why is the type () => Unit instead of (Any) => Unit?

具有发布者和订阅者特征的 Scala 类

java - 使用 Hadoop 从 Spark 连接到 ElasticSearch 不起作用

apache-spark - PySpark SparkSession Builder 与 Kubernetes Master

windows - 如何在 Windows 10 上逐步设置 Spark

hadoop - 线程 "main"org.apache.hadoop.mapred.InvalidJobConfException : Output directory not set 中出现异常

scala - 使用 Apache Spark 作为 Web 应用程序的后端

java - Hadoop Map Reduce - 将 Iterable<Text> 值写入上下文时,reduce 中的嵌套循环忽略文本结果