apache-spark - Spark 如何向工作线程发送闭包?

标签 apache-spark

当我编写 RDD 转换时,例如

val rdd = sc.parallelise(1 to 1000) 
rdd.map(x => x * 3)

我知道闭包(x => x * 3)只是一个Function1,需要可序列化,并且我想我在某处读过编辑:它就在文档中暗示的地方:http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark 它被“发送”给 worker 执行。 (例如 Akka 向工作人员发送一段“可执行代码”来运行)

是这样的吗?

我参加的一次聚会上有人评论说,它实际上并没有发送任何序列化代码,但由于每个工作人员无论如何都会获得 jar 的“副本”,因此它只需要引用要运行的函数或类似的东西(但我不确定我引用的那个人是否正确)

我现在对它的实际工作原理感到非常困惑。

所以我的问题是

  1. 如何将转换关闭发送给工作人员?通过akka连载?或者它们“已经在那里”,因为 Spark 将整个 uber jar 发送给每个工作人员(对我来说听起来不太可能......)

  2. 如果是这样,那么 jar 的其余部分如何发送给 worker ?这是“cleanupClosure”在做什么吗?例如仅向工作人员发送相关字节码而不是整个 uberjar? (例如,只有闭包的依赖代码?)

  3. 总而言之,spark 是否在任何时候都会以某种方式将 --jars 类路径中的 jar 与工作人员同步?或者它是否向工作人员发送“适量”的代码?如果它确实发送了闭包,它们是否会被缓存以供重新计算的需要?或者每次安排任务时都会发送带有任务的闭包?抱歉,如果这是一个愚蠢的问题,但我真的不知道。

如果可以的话,请添加答案的来源,我在文档中找不到明确的答案,而且我太谨慎了,无法仅通过阅读代码来得出结论。

最佳答案

闭包肯定是在运行时序列化的。我在 pyspark 和 scala 中看到过很多在运行时出现 Closure Not Serialized 异常的实例。有一个复杂的代码称为

来自ClosureCleaner.scala

def clean(
    closure: AnyRef,
    checkSerializable: Boolean = true,
    cleanTransitively: Boolean = true): Unit = {
  clean(closure, checkSerializable, cleanTransitively, Map.empty)
}

尝试缩小正在序列化的代码。然后,代码通过网络发送 - 如果它是可序列化的。否则会抛出异常。

这是 ClosureCleaner 的另一个摘录,用于检查序列化传入函数的能力:

  private def ensureSerializable(func: AnyRef) {
    try {
      if (SparkEnv.get != null) {
        SparkEnv.get.closureSerializer.newInstance().serialize(func)
      }
    } catch {
      case ex: Exception => throw new SparkException("Task not serializable", ex)
    }
  }

关于apache-spark - Spark 如何向工作线程发送闭包?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32015547/

相关文章:

apache-spark - Spark 数据帧 : Convert bigint to timestamp

java - 有没有办法在 UDF 中添加新列(在 java Spark 中)

python - 如何使用 Lambda 函数为 Python 中的字典分配键和值?

apache-spark - 如何在执行器中获取工作目录

java - 从表复制到表(更改主键)

sql - Hadoop 层次结构之谜

python - 在spark Dataframe中动态创建多列

scala - Spark中独立爆炸多个列

apache-spark - 如何正确处理从 Spark Streaming 生成的分区 Parquet 文件

sql - Spark SQL 会完全取代 Apache Impala 或 Apache Hive 吗?