当我编写 RDD 转换时,例如
val rdd = sc.parallelise(1 to 1000)
rdd.map(x => x * 3)
我知道闭包(x => x * 3
)只是一个Function1,需要可序列化,并且我想我在某处读过编辑:它就在文档中暗示的地方:http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark 它被“发送”给 worker 执行。 (例如 Akka 向工作人员发送一段“可执行代码”来运行)
是这样的吗?
我参加的一次聚会上有人评论说,它实际上并没有发送任何序列化代码,但由于每个工作人员无论如何都会获得 jar 的“副本”,因此它只需要引用要运行的函数或类似的东西(但我不确定我引用的那个人是否正确)
我现在对它的实际工作原理感到非常困惑。
所以我的问题是
如何将转换关闭发送给工作人员?通过akka连载?或者它们“已经在那里”,因为 Spark 将整个 uber jar 发送给每个工作人员(对我来说听起来不太可能......)
如果是这样,那么 jar 的其余部分如何发送给 worker ?这是“cleanupClosure”在做什么吗?例如仅向工作人员发送相关字节码而不是整个 uberjar? (例如,只有闭包的依赖代码?)
总而言之,spark 是否在任何时候都会以某种方式将
--jars
类路径中的 jar 与工作人员同步?或者它是否向工作人员发送“适量”的代码?如果它确实发送了闭包,它们是否会被缓存以供重新计算的需要?或者每次安排任务时都会发送带有任务的闭包?抱歉,如果这是一个愚蠢的问题,但我真的不知道。
如果可以的话,请添加答案的来源,我在文档中找不到明确的答案,而且我太谨慎了,无法仅通过阅读代码来得出结论。
最佳答案
闭包肯定是在运行时序列化的。我在 pyspark 和 scala 中看到过很多在运行时出现 Closure Not Serialized 异常的实例。有一个复杂的代码称为
来自ClosureCleaner.scala
def clean(
closure: AnyRef,
checkSerializable: Boolean = true,
cleanTransitively: Boolean = true): Unit = {
clean(closure, checkSerializable, cleanTransitively, Map.empty)
}
尝试缩小正在序列化的代码。然后,代码通过网络发送 - 如果它是可序列化的。否则会抛出异常。
这是 ClosureCleaner 的另一个摘录,用于检查序列化传入函数的能力:
private def ensureSerializable(func: AnyRef) {
try {
if (SparkEnv.get != null) {
SparkEnv.get.closureSerializer.newInstance().serialize(func)
}
} catch {
case ex: Exception => throw new SparkException("Task not serializable", ex)
}
}
关于apache-spark - Spark 如何向工作线程发送闭包?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32015547/