serialization - Spark如何处理对象

标签 serialization apache-spark rdd

为了测试spark中的序列化异常,我用两种方法编写了一个任务。
第一种方式:

package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object dd {
  def main(args: Array[String]):Unit = {
    val sparkConf = new SparkConf
    val sc = new SparkContext(sparkConf)

    val data = List(1,2,3,4,5)
    val rdd = sc.makeRDD(data)
    val result = rdd.map(elem => {
      funcs.func_1(elem)
    })        
    println(result.count())
  }
}

object funcs{
  def func_1(i:Int): Int = {
    i + 1
  }
}

这样, Spark 效果很好。
当我将其更改为以下方式时,它不起作用并抛出NotSerializableException。
第二种方式:
package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object dd {
  def main(args: Array[String]):Unit = {
    val sparkConf = new SparkConf
    val sc = new SparkContext(sparkConf)

    val data = List(1,2,3,4,5)
    val rdd = sc.makeRDD(data)

    val handler = funcs
    val result = rdd.map(elem => {
      handler.func_1(elem)
    })

    println(result.count())

  }
}

object funcs{
  def func_1(i:Int): Int = {
    i + 1
  }
}

我知道出现错误“任务不可序列化”的原因是因为我试图在第二个示例中将无法序列化的对象funcs从驱动程序节点发送到工作程序节点。对于第二个示例,如果我使对象funcs扩展为Serializable,则此错误将消失。

但是在我看来,因为funcs是一个对象而不是一个类,所以它是一个单例,应该被序列化并从驱动程序传送给工作程序,而不是在工作程序节点本身中实例化。在这种情况下,尽管使用对象funcs的方式不同,但是我猜这两个示例中的无法序列化的对象funcs是从驱动程序节点传输到工作程序节点的。

我的问题是,为什么第一个示例可以成功运行,而第二个示例却因“任务无法序列化”异常而失败。

最佳答案

当您在RDD闭包中运行代码(映射,过滤器等)时,执行该代码所需的一切都将被打包,序列化并发送给执行程序以进行运行。所有被引用的对象(或被引用的字段)都将在此任务中序列化,有时您会在这里获得NotSerializableException

但是,您的用例要复杂一些,并且涉及scala编译器。通常,在scala对象上调用函数等同于调用java静态方法。该对象永远不会真正存在-基本上就像是内联编写代码。但是,如果将对象分配给变量,则实际上是在内存中创建对该对象的引用,并且该对象的行为更像类,并且可能存在序列化问题。

scala> object A { 
  def foo() { 
    println("bar baz")
  }
}
defined module A

scala> A.foo()  // static method
bar baz

scala> val a = A  // now we're actually assigning a memory location
a: A.type = A$@7e0babb1

scala> a.foo()  // dereferences a before calling foo
bar baz

关于serialization - Spark如何处理对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40596871/

相关文章:

c# - 使用 WCF 的 DataContract/Data 成员序列化对象

c# - 序列化和反序列化域事件以在通用实现中持久化并从事件存储中检索

c# - 无法序列化我的游戏统计数据

Python - 将行拆分为列 - csv 数据

scala - 将 Spark 数据帧插入 hbase

scala - 从数据帧访问 scala 映射而不使用 UDF

c++ - Boost - 仅当大小匹配时反序列化数组

python - 在 Spark 和 Python 中编写 flatMap 函数

apache-spark - 如何在spark sql 2.1.0中的Dataset<Row>上获取groupby之后的所有列

python - 在 YARN 上运行的 Spark 如何计算 Python 内存使用量?