为了测试spark中的序列化异常,我用两种方法编写了一个任务。
第一种方式:
package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object dd {
def main(args: Array[String]):Unit = {
val sparkConf = new SparkConf
val sc = new SparkContext(sparkConf)
val data = List(1,2,3,4,5)
val rdd = sc.makeRDD(data)
val result = rdd.map(elem => {
funcs.func_1(elem)
})
println(result.count())
}
}
object funcs{
def func_1(i:Int): Int = {
i + 1
}
}
这样, Spark 效果很好。
当我将其更改为以下方式时,它不起作用并抛出NotSerializableException。
第二种方式:
package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object dd {
def main(args: Array[String]):Unit = {
val sparkConf = new SparkConf
val sc = new SparkContext(sparkConf)
val data = List(1,2,3,4,5)
val rdd = sc.makeRDD(data)
val handler = funcs
val result = rdd.map(elem => {
handler.func_1(elem)
})
println(result.count())
}
}
object funcs{
def func_1(i:Int): Int = {
i + 1
}
}
我知道出现错误“任务不可序列化”的原因是因为我试图在第二个示例中将无法序列化的对象
funcs
从驱动程序节点发送到工作程序节点。对于第二个示例,如果我使对象funcs
扩展为Serializable
,则此错误将消失。但是在我看来,因为
funcs
是一个对象而不是一个类,所以它是一个单例,应该被序列化并从驱动程序传送给工作程序,而不是在工作程序节点本身中实例化。在这种情况下,尽管使用对象funcs
的方式不同,但是我猜这两个示例中的无法序列化的对象funcs
是从驱动程序节点传输到工作程序节点的。我的问题是,为什么第一个示例可以成功运行,而第二个示例却因“任务无法序列化”异常而失败。
最佳答案
当您在RDD闭包中运行代码(映射,过滤器等)时,执行该代码所需的一切都将被打包,序列化并发送给执行程序以进行运行。所有被引用的对象(或被引用的字段)都将在此任务中序列化,有时您会在这里获得NotSerializableException
。
但是,您的用例要复杂一些,并且涉及scala编译器。通常,在scala对象上调用函数等同于调用java静态方法。该对象永远不会真正存在-基本上就像是内联编写代码。但是,如果将对象分配给变量,则实际上是在内存中创建对该对象的引用,并且该对象的行为更像类,并且可能存在序列化问题。
scala> object A {
def foo() {
println("bar baz")
}
}
defined module A
scala> A.foo() // static method
bar baz
scala> val a = A // now we're actually assigning a memory location
a: A.type = A$@7e0babb1
scala> a.foo() // dereferences a before calling foo
bar baz
关于serialization - Spark如何处理对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40596871/