dictionary - Spark mapPartitions 与 transient 惰性 val

标签 dictionary apache-spark partition transient

我想知道使用 Spark 有什么不同 mapPartitions功能与 transient 惰性值。
由于每个分区基本上都在不同的节点上运行,因此将在每个节点上创建一个 transient 惰性 val 实例(假设它在一个对象中)。

例如:

class NotSerializable(v: Int) {
  def foo(a: Int) = ???
}

object OnePerPartition {
  @transient lazy val obj: NotSerializable = new NotSerializable(10)
}

object Test extends App{
    val conf = new SparkConf().setMaster("local[2]").setAppName("test")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(1 to 100000)

    rdd.map(OnePerPartition.obj.foo)

    // ---------- VS ----------

    rdd.mapPartitions(itr => {
      val obj = new NotSerializable(10)
      itr.map(obj.foo)
    })
}

有人可能会问为什么你甚至想要它......
我想创建一个通用容器概念,用于在任何通用集合实现( RDDListscalding pipe 等)上运行我的逻辑
他们都有“ map ”的概念,但是mapPartition是唯一的 spark .

最佳答案

首先你不需要transient lazy这里。使用 object包装器足以完成这项工作,您实际上可以将其编写为:

object OnePerExecutor {
  val obj: NotSerializable = new NotSerializable(10)
}

对象包装器和初始化 NotSerializable 之间存在根本区别。内mapPartitions .这个:

rdd.mapPartitions(iter => {
  val ns = NotSerializable(1)
  ???
})

创建单个 NotSerializable每个分区的实例。

另一方面,对象包装器创建一个 NotSerializable每个执行器 JVM 的实例。结果这个实例:
  • 可用于处理多个分区。
  • 可以被多个执行器线程同时访问。
  • 生命周期超过使用它的函数调用。

  • 这意味着它应该是线程安全的,并且任何方法调用都应该没有副作用。

    关于dictionary - Spark mapPartitions 与 transient 惰性 val,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40773830/

    相关文章:

    teradata - SQL 连接多行

    python - 在每个列表项中的字符之后删除所有列表项中的文本 Python

    java - 如何获取 Arraylist 内的 Hashmap 值

    java - 使用 JAXB 解码嵌套 XML 元素

    python - 如何摆脱python windows文件路径字符串中的双反斜杠?

    sql - 对象 sql 不是包 org.apache.spark 的成员

    mysql - 如何按列值 "user_id"和 "gps_time"对mysql进行分区?

    sorting - 我如何使用mapreduce在hadoop集群上实现自适应mergesort

    apache-spark - Pyspark:在数据框中用 null 替换所有出现的值

    hadoop - 为什么我们需要 MapReduce 中的 "map"部分?