scala - Spark : Task not Serializable for UDF on DataFrame

标签 scala serialization apache-spark

当我尝试在 Spark 1.4.1 上执行以下命令时,我收到 org.apache.spark.SparkException: Task not serializable:

import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat

object ConversionUtils {
  val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")

  def tsUTC(s: String): Timestamp = new Timestamp(iso8601.parse(s).getTime)

  val castTS = udf[Timestamp, String](tsUTC _)
}

val df = frame.withColumn("ts", ConversionUtils.castTS(frame("ts_str")))
df.first

这里,frame 是一个位于 HiveContext 中的 DataFrame。该数据框没有任何问题。

我有类似的整数 UDF,它们工作没有任何问题。然而,带有时间戳的似乎会引起问题。根据documentationjava.sql.TimeStamp 实现了Serialized,所以这不是问题。 SimpleDateFormat 也是如此,如 here 所示。 。

这让我相信是 UDF 造成了问题。但是,我不确定修复什么以及如何修复它。

跟踪的相关部分:

Caused by: java.io.NotSerializableException: ...
Serialization stack:
        - object not serializable (class: ..., value: ...$ConversionUtils$@63ed11dd)
        - field (class: ...$ConversionUtils$$anonfun$3, name: $outer, type: class ...$ConversionUtils$)
        - object (class ...$ConversionUtils$$anonfun$3, <function1>)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, name: func$2, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, <function1>)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf, name: f, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf, scalaUDF(ts_str#2683))
        - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class org.apache.spark.sql.catalyst.expressions.Alias, scalaUDF(ts_str#2683) AS ts#7146)
        - element of array (index: 35)
        - array (class [Ljava.lang.Object;, size 36)
        - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
        - object (class scala.collection.mutable.ArrayBuffer,

最佳答案

尝试:

object ConversionUtils extends Serializable {
  ...
}

关于scala - Spark : Task not Serializable for UDF on DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36794688/

相关文章:

scala - ForkJoinPool 在只有两个 worker 的情况下被阻塞

Scala无法解释的程序行为

javascript - Fabric.js toJSON() 方法缺少图像

java - 在java中覆盖类签名

scala - HDFS : java. io.FileNotFoundException : File does not exist: name. _COPYING

eclipse - Scala Eclipse插件的当前状态是什么?

java Spring 。棘手的 json 和 xml 序列化注释

postgresql - PostgreSQL 中的批量存储过程

dataframe - 添加一个列来计算累积的先前重复值

scala - 更改 Scala 案例类树中的节点