scala - Spark : UDF executed many times

标签 scala apache-spark apache-spark-sql

我有一个带有以下代码的数据框:

def test(lat: Double, lon: Double) = {
  println(s"testing ${lat / lon}")
  Map("one" -> "one", "two" -> "two")
}

val testUDF = udf(test _)

df.withColumn("test", testUDF(col("lat"), col("lon")))
  .withColumn("test1", col("test.one"))
  .withColumn("test2", col("test.two"))

现在检查日志,我发现对于每行UDF都会执行3次。如果我从“test.three”列中添加“test3”,则将再次执行UDF。

有人可以解释我为什么吗?

是否可以正确避免这种情况(即使添加了“测试”,也无需缓存数据框,即使这可行)?

最佳答案

如果要避免多次调用udf(这很有用,尤其是在udf是您工作的瓶颈的情况下),则可以按照以下步骤进行操作:

val testUDF = udf(test _).asNondeterministic()

基本上,您告诉Spark您的函数不是确定性的,现在Spark确保只调用一次,因为多次调用它并不安全(每次调用可能返回不同的结果)。

另请注意,此技巧不是免费的,这样做会给优化器施加一些约束,例如,它的副作用是Spark优化器不会通过不确定的表达式推送过滤器,因此您需要对优化负责过滤条件在查询中的位置。

关于scala - Spark : UDF executed many times,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58696198/

相关文章:

Scala Functor 和 Monad 的区别

scala - 鸵鸟如何用于配置?

java - 将带有 List 的 scala hashmap 转换为带有 java list 的 java hashmap

java - Spark 2.0.0 抛出 AlreadyExistsException(消息 :Database default already exists) when interact with Hive 1. 0.0

apache-spark - 执行pyspark.sql.DataFrame.take(4)需要1个多小时

python - 将 pyspark 字符串转换为日期格式

scala - 编译器无法识别自类型注释

apache-spark - 齐柏林飞艇 [0.7.2] : NullPointerException on executing paragraph from a new Notebook

scala - 在 scala intellij 中保存数据帧会抛出异常

scala:注释访问器方法