scala - RDD API与结合了DataFrame API的UDF对性能的影响

标签 scala performance apache-spark apache-spark-sql rdd

(特定于标量的问题。)

尽管Spark文档鼓励在可能的情况下使用DataFrame API,但如果DataFrame API不足,通常是在退回RDD API还是使用UDF之间进行选择。这两种选择之间是否存在固有的性能差异?

RDD和UDF相似之处在于它们都不可以从Catalyst和Tungsten优化中受益。是否还有其他开销,如果存在,两种方法之间是否有区别?

举一个具体的例子,假设我有一个DataFrame,其中包含一列具有自定义格式(不适合regexp匹配)的文本数据。我需要解析该列并添加一个包含结果标记的新向量列。

最佳答案

neither of them can benefit from Catalyst and Tungsten optimizations



这是不完全正确的。尽管UDF不能从钨优化中受益(可以说简单的SQL转换也不能在那里获得巨大的提升),但您仍然可以从Catalyst提供的执行计划优化中受益。让我们用一个简单的示例进行说明(注意:Spark 2.0和Scala。请勿将其推断到早期版本,尤其是PySpark):
val f = udf((x: String) => x == "a")
val g = udf((x: Int) => x + 1)

val df = Seq(("a", 1), ("b", 2)).toDF

df
  .groupBy($"_1")
  .agg(sum($"_2").as("_2"))
  .where(f($"_1"))
  .withColumn("_2", g($"_2"))
  .select($"_1")
  .explain

// == Physical Plan ==
// *HashAggregate(keys=[_1#2], functions=[])
// +- Exchange hashpartitioning(_1#2, 200)
//    +- *HashAggregate(keys=[_1#2], functions=[])
//       +- *Project [_1#2]
//          +- *Filter UDF(_1#2)
//             +- LocalTableScan [_1#2, _2#3]

执行计划向我们展示了两件事:
  • Selection在聚合之前已被下推。
  • Projection在聚合之前已被下推,并有效删除了第二个UDF调用。

  • 取决于数据和管道,这几乎可以免费提供实质性的性能提升。

    话虽这么说,RDD和UDF都需要在安全与不安全之间进行迁移,而后者的灵活性明显不足。但是,如果您唯一需要的是简单的类似于map的行为,而无需初始化昂贵的对象(例如数据库连接),那么UDF是可行的方法。

    在稍微复杂的场景中,如果您确实需要访问某些低级功能(例如自定义分区),则可以轻松地使用通用Dataset并保留RDDs

    关于scala - RDD API与结合了DataFrame API的UDF对性能的影响,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38860808/

    相关文章:

    java - 对于性能而言,即使我使用 MVC,Spring Webservices 还是纯粹的 Restful jersey?

    scala - 如何将csv文件转换为rdd

    python - 我似乎无法让 Spark 上的 --py-files 工作

    list - Scala:将 map 大小 n 拆分为列表( map 最大大小为 3)

    scala - 如何使用 spark 在 scala 中获取 XGBoost 的特征重要性?

    scala - 用于列表、选项和其他复杂类型的高级类型 scala

    javascript - 提高 jQuery、预加载、动画的性能,多少算太多?

    scala - 压缩多个序列

    linux - 推荐哪一个: using static lib vs dynamic lib (shared object)

    apache-spark - Spark Streaming - 如何在 updateStateByKey 函数中获取 "Key"