我想知道使用 Spark 有什么不同 mapPartitions
功能与 transient 惰性值。
由于每个分区基本上都在不同的节点上运行,因此将在每个节点上创建一个 transient 惰性 val 实例(假设它在一个对象中)。
例如:
class NotSerializable(v: Int) {
def foo(a: Int) = ???
}
object OnePerPartition {
@transient lazy val obj: NotSerializable = new NotSerializable(10)
}
object Test extends App{
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.parallelize(1 to 100000)
rdd.map(OnePerPartition.obj.foo)
// ---------- VS ----------
rdd.mapPartitions(itr => {
val obj = new NotSerializable(10)
itr.map(obj.foo)
})
}
有人可能会问为什么你甚至想要它......
我想创建一个通用容器概念,用于在任何通用集合实现(
RDD
、 List
、 scalding pipe
等)上运行我的逻辑他们都有“ map ”的概念,但是
mapPartition
是唯一的 spark
.
最佳答案
首先你不需要transient
lazy
这里。使用 object
包装器足以完成这项工作,您实际上可以将其编写为:
object OnePerExecutor {
val obj: NotSerializable = new NotSerializable(10)
}
对象包装器和初始化
NotSerializable
之间存在根本区别。内mapPartitions
.这个:rdd.mapPartitions(iter => {
val ns = NotSerializable(1)
???
})
创建单个
NotSerializable
每个分区的实例。另一方面,对象包装器创建一个
NotSerializable
每个执行器 JVM 的实例。结果这个实例:这意味着它应该是线程安全的,并且任何方法调用都应该没有副作用。
关于dictionary - Spark mapPartitions 与 transient 惰性 val,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40773830/