scala - 是否可以对 Spark UDAF 进行单元测试?

标签 scala unit-testing apache-spark apache-spark-sql user-defined-functions

Spark UDAF 要求您实现多种方法,特别是def update(buffer: MutableAggregationBuffer, input: Row): Unitdef merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
假设我有一个 UDAF X,4 行 (r0, r1, r2, r3)和两个聚合缓冲区 A, B在我的测试中。
我想看到这段代码产生了预期的结果:

X.update(A, r0)
X.update(A, r1)
X.update(B, r2)
X.update(B, r3)
X.merge(A, B)
X.evaluate(A)

与仅使用一个缓冲区在 4 行中的每一行上调用 X.update 相同:
X.update(A, r0)
X.update(A, r1)
X.update(A, r2)
X.update(A, r3)
X.evaluate(A)

这样就测试了两种方法的正确性。
但是,我不知道如何编写这样的测试:用户代码似乎无法实例化 MutableAggregationBuffer 的任何实现。 .

如果我只是从我的 4 行中创建一个 DF,并尝试使用 groupBy().agg(...)要调用我的 UDAF,Spark 甚至不会尝试以这种特定方式合并它们 - 因为它的行数很少,所以不需要。

最佳答案

MutableAggregationBuffer只是一个抽象类。您可以轻松创建自己的实现,例如这样的实现:

import org.apache.spark.sql.expressions._

class DummyBuffer(init: Array[Any]) extends MutableAggregationBuffer {
  val values: Array[Any] = init
  def update(i: Int, value: Any) = values(i) = value
  def get(i: Int): Any = values(i)
  def length: Int = init.size
  def copy() = new DummyBuffer(values)
}

它不会取代“真实的东西”,但对于简单的测试场景应该足够了。

关于scala - 是否可以对 Spark UDAF 进行单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43703207/

相关文章:

在 cobertura-maven-plugin 测试中加载 Spring ApplicationContext

c# - 除非断言失败,否则不构建上下文消息来加速单元测试

python - "getNumPartitions"是一个昂贵的操作吗?

scala - 如何将 Seq[Column] 转换为 Map[String,String] 并更改值?

scala - 如何在包含 RadioButtons 的 Scala 中创建一个新的 ButtonGroup?

scala - Spark - 将 Map 转换为单行 DataFrame

unit-testing - Grails Spring Security登录 Controller 测试

scala - 如果值包含在 scala spark 的列表中,如何过滤行?

hadoop - 从HBase提取数据时,在RDD中获取Null数据

scala - 多态类型的类型类遭受类型删除