假设我有以下代码:
class Context {
def compute() = Array(1.0)
}
val ctx = new Context
val data = ctx.compute
现在我们在 Spark 中运行这段代码:
val rdd = sc.parallelize(List(1,2,3))
rdd.map(_ + data(0)).count()
上面的代码抛出
org.apache.spark.SparkException: Task not serializable
.我是 不问如何解决 ,通过扩展 Serializable 或制作案例类,我想了解错误发生的原因。我不明白的是为什么它提示 Context 类不是
Serializable
,尽管它不是 lambda 的一部分:rdd.map(_ + data(0))
. data
这是一个应该序列化的值数组,但似乎 JVM 也捕获了 ctx
引用也是如此,据我了解,这不应该发生。据我了解,在 shell 中 Spark 应该从 repl 上下文中清除 lambda。如果我们在
delambdafy
之后打印树阶段,我们会看到这些片段:object iw extends Object {
...
private[this] val ctx: $line11.iw$Context = _;
<stable> <accessor> def ctx(): $line11.iw$Context = iw.this.ctx;
private[this] val data: Array[Double] = _;
<stable> <accessor> def data(): Array[Double] = iw.this.data;
...
}
class anonfun$1 ... {
final def apply(x$1: Int): Double = anonfun$1.this.apply$mcDI$sp(x$1);
<specialized> def apply$mcDI$sp(x$1: Int): Double = x$1.+(iw.this.data().apply(0));
...
}
所以发送到工作节点的反编译的 lambda 代码是:
x$1.+(iw.this.data().apply(0))
.部分 iw.this
属于 Spark-Shell session ,因此,据我了解,它应该由 ClosureCleaner 清除, 因为与逻辑无关,不应该被序列化。无论如何,调用iw.this.data()
返回 Array[Double]
data
的值变量,在构造函数中初始化:def <init>(): type = {
iw.super.<init>();
iw.this.ctx = new $line11.iw$Context();
iw.this.data = iw.this.ctx().compute(); // <== here
iw.this.res4 = ...
()
}
据我了解
ctx
value 与 lambda 无关,它不是闭包,因此不应序列化。我错过了什么或误解了什么?
最佳答案
这与 Spark 认为它可以安全地用作闭包有关。这在某些情况下非常直观,因为 Spark 使用反射并且在许多情况下无法识别 Scala 的某些保证(不是完整的编译器或任何东西)或同一对象中的某些变量不相关的事实。为安全起见,Spark 将尝试序列化任何引用的对象,在您的情况下包括 iw
,这是不可序列化的。
ClosureCleaner里面的代码有一个很好的例子:
For instance, transitive cleaning is necessary in the following scenario:
class SomethingNotSerializable { def someValue = 1 def scope(name: String)(body: => Unit) = body def someMethod(): Unit = scope("one") { def x = someValue def y = 2 scope("two") { println(y + 1) } } }
In this example, scope "two" is not serializable because it references scope "one", which references SomethingNotSerializable. Note that, however, the body of scope "two" does not actually depend on SomethingNotSerializable. This means we can safely null out the parent pointer of a cloned scope "one" and set it the parent of scope "two", such that scope "two" no longer references SomethingNotSerializable transitively.
可能最简单的解决方法是在同一范围内创建一个局部变量,该变量从您的对象中提取值,这样就不再有对 lambda 内封装对象的任何引用:
val rdd = sc.parallelize(List(1,2,3))
val data0 = data(0)
rdd.map(_ + data0).count()
关于scala - Spark序列化错误之谜,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33709516/