我想为基于多个列的一些自定义聚合编写一个 UDAF。一个简单的示例是具有两列 c1 和 c2 的数据框。对于每一行,我取 c1 和 c2 的最大值(我们称之为 cmax),然后取 cmax 的总和。
当我调用 df.agg() 时,我似乎无法将两个或更多列传递给任何聚合方法(包括 UDAF)。第一个问题,这是真的吗?
对于这个简单的示例,我可以创建另一个名为 cmax 的列,并对 cmax 进行聚合。但实际上,我需要基于 N 个列组合进行聚合,结果将是大小为 N 的集合。我希望在 UDAF 的更新方法中循环组合。因此它需要 N 个中间列,这对我来说似乎不是一个干净的解决方案。第二个问题,我想知道创建中间列是否是这样做的方法,或者是否有更好的解决方案。
我注意到在 RDD 中,问题要容易得多。我可以将整个记录传递给我的聚合函数,并且我可以访问所有数据字段。
最佳答案
您可以在 UDAF 中使用尽可能多的列,因为它的 apply
函数的签名接受多个列
(来自其源代码)。
def apply(exprs: Column*): Column
您只需确保 inputSchema
返回一个 StructType
反射(reflect)您想要用作 UDAF 输入的列。
对于列 c1
和 c2
的情况,您的 UDAF 必须实现具有以下架构的 inputSchema
:
def inputSchema: StructType = StructType(Array(StructField("c1", DoubleType), StructField("c2", DoubleType)))
但是,如果您想要更通用的解决方案,您始终可以使用允许返回正确的 inputSchema
的参数来初始化自定义 UDAF。请参阅下面的示例,该示例允许在构造时定义任意 StructType
(注意,我们不验证 StructType
是否为 DoubleType
)。
class MyMaxUDAF(schema: StructType) extends UserDefinedAggregateFunction {
def inputSchema: StructType = this.schema
def bufferSchema: StructType = StructType(Array(StructField("maxSum", DoubleType)))
def dataType: DataType = DoubleType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = 0.0
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getDouble(0) + Array.range(0, input.length).map(input.getDouble).max
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = buffer2 match {
case Row(buffer2Sum: Double) => buffer1(0) = buffer1.getDouble(0) + buffer2Sum
}
def evaluate(buffer: Row): Double = buffer match {
case Row(totalSum: Double) => totalSum
}
}
您的 DataFrame 包含用于聚合的值和键。
val df = spark.createDataFrame(Seq(
Entry(0, 1.0, 2.0, 3.0), Entry(0, 3.0, 1.0, 2.0), Entry(1, 6.0, 2.0, 2)
))
df.show
+-------+---+---+---+
|groupMe| c1| c2| c3|
+-------+---+---+---+
| 0|1.0|2.0|3.0|
| 0|3.0|1.0|2.0|
| 1|6.0|2.0|2.0|
+-------+---+---+---+
使用 UDAF,我们预计 max 的总和为 6.0 和 6.0
val fields = Array("c1", "c2", "c3")
val struct = StructType(fields.map(StructField(_, DoubleType)))
val myMaxUDAF: MyMaxUDAF = new MyMaxUDAF(struct)
df.groupBy("groupMe").agg(myMaxUDAF(fields.map(df(_)):_*)).show
+-------+---------------------+
|groupMe|mymaxudaf(c1, c2, c3)|
+-------+---------------------+
| 0| 6.0|
| 1| 6.0|
+-------+---------------------+
有一个关于 UDAF 的很好的教程。不幸的是,它们没有涵盖多个参数。
https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/
关于apache-spark - 基于两个或多个列的 Spark DataFrame 聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46310359/