我有一个带有以下代码的数据框:
def test(lat: Double, lon: Double) = {
println(s"testing ${lat / lon}")
Map("one" -> "one", "two" -> "two")
}
val testUDF = udf(test _)
df.withColumn("test", testUDF(col("lat"), col("lon")))
.withColumn("test1", col("test.one"))
.withColumn("test2", col("test.two"))
现在检查日志,我发现对于每行UDF都会执行3次。如果我从“test.three”列中添加“test3”,则将再次执行UDF。
有人可以解释我为什么吗?
是否可以正确避免这种情况(即使添加了“测试”,也无需缓存数据框,即使这可行)?
最佳答案
如果要避免多次调用udf(这很有用,尤其是在udf是您工作的瓶颈的情况下),则可以按照以下步骤进行操作:
val testUDF = udf(test _).asNondeterministic()
基本上,您告诉Spark您的函数不是确定性的,现在Spark确保只调用一次,因为多次调用它并不安全(每次调用可能返回不同的结果)。
另请注意,此技巧不是免费的,这样做会给优化器施加一些约束,例如,它的副作用是Spark优化器不会通过不确定的表达式推送过滤器,因此您需要对优化负责过滤条件在查询中的位置。
关于scala - Spark : UDF executed many times,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58696198/