scala - Spark序列化错误之谜

标签 scala serialization apache-spark jvm

假设我有以下代码:

class Context {
  def compute() = Array(1.0)
}
val ctx = new Context
val data = ctx.compute

现在我们在 Spark 中运行这段代码:
val rdd = sc.parallelize(List(1,2,3))
rdd.map(_ + data(0)).count()

上面的代码抛出 org.apache.spark.SparkException: Task not serializable .我是 不问如何解决 ,通过扩展 Serializable 或制作案例类,我想了解错误发生的原因。

我不明白的是为什么它提示 Context 类不是 Serializable ,尽管它不是 lambda 的一部分:rdd.map(_ + data(0)) . data这是一个应该序列化的值数组,但似乎 JVM 也捕获了 ctx引用也是如此,据我了解,这不应该发生。

据我了解,在 shell 中 Spark 应该从 repl 上下文中清除 lambda。如果我们在 delambdafy 之后打印树阶段,我们会看到这些片段:
object iw extends Object {
  ... 
  private[this] val ctx: $line11.iw$Context = _;
  <stable> <accessor> def ctx(): $line11.iw$Context = iw.this.ctx;
  private[this] val data: Array[Double] = _;
  <stable> <accessor> def data(): Array[Double] = iw.this.data; 
  ...
}

class anonfun$1 ... {
  final def apply(x$1: Int): Double = anonfun$1.this.apply$mcDI$sp(x$1);
  <specialized> def apply$mcDI$sp(x$1: Int): Double = x$1.+(iw.this.data().apply(0));
  ...
}

所以发送到工作节点的反编译的 lambda 代码是:x$1.+(iw.this.data().apply(0)) .部分 iw.this属于 Spark-Shell session ,因此,据我了解,它应该由 ClosureCleaner 清除, 因为与逻辑无关,不应该被序列化。无论如何,调用iw.this.data()返回 Array[Double] data 的值变量,在构造函数中初始化:
def <init>(): type = {
  iw.super.<init>();
  iw.this.ctx = new $line11.iw$Context();
  iw.this.data = iw.this.ctx().compute(); // <== here
  iw.this.res4 = ...
  ()
}

据我了解ctx value 与 lambda 无关,它不是闭包,因此不应序列化。我错过了什么或误解了什么?

最佳答案

这与 Spark 认为它可以安全地用作闭包有关。这在某些情况下非常直观,因为 Spark 使用反射并且在许多情况下无法识别 Scala 的某些保证(不是完整的编译器或任何东西)或同一对象中的某些变量不相关的事实。为安全起见,Spark 将尝试序列化任何引用的对象,在您的情况下包括 iw ,这是不可序列化的。

ClosureCleaner里面的代码有一个很好的例子:

For instance, transitive cleaning is necessary in the following scenario:

class SomethingNotSerializable {
  def someValue = 1
  def scope(name: String)(body: => Unit) = body
  def someMethod(): Unit = scope("one") {
    def x = someValue
    def y = 2
      scope("two") { println(y + 1) }
  }
}

In this example, scope "two" is not serializable because it references scope "one", which references SomethingNotSerializable. Note that, however, the body of scope "two" does not actually depend on SomethingNotSerializable. This means we can safely null out the parent pointer of a cloned scope "one" and set it the parent of scope "two", such that scope "two" no longer references SomethingNotSerializable transitively.



可能最简单的解决方法是在同一范围内创建一个局部变量,该变量从您的对象中提取值,这样就不再有对 lambda 内封装对象的任何引用:
val rdd = sc.parallelize(List(1,2,3))
val data0 = data(0)
rdd.map(_ + data0).count()

关于scala - Spark序列化错误之谜,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33709516/

相关文章:

scala - 使用 Intel AVX 和 Spark

scala - 我想将 Hive 中所有现有的 UDTF 转换为 Scala 函数并从 Spark SQL 使用它

scala - 当一个是返回值时如何推断类型?

scala - 如何将 "filter"Akka INFO 日志记录输出到不同的文件?

java - BlazeDS环境中的RemoteObject : Serializing objects from JAXB/XJC-generated classes

java - 通过 ObjectStream 序列化问题发送/接收

apache-spark - 将数百万个小文件从 Azure Data Lake Storage 加载到 Databricks

scala - intelliJ运行中的 Spark :无法解析符号和标识符,但找到了字符串,并找到了 ';',但找到了 ')'

scala - "Error during sbt execution: No Scala version specified or detected"来自 Homebrew 的 SBT

delphi - TPopupMenu 作为子组件,序列化 TMenuItems