scala - 了解 Spark 中的闭包和并行性

标签 scala hadoop apache-spark

我正在尝试了解某些​​事情在 Spark 中是如何工作的。在示例中如 http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka

表示代码将对 RDD 中的值求和并将其存储在计数器中,但这里不是这种情况,因为它不起作用。只有当您删除 paralelize 时,它​​才会起作用。

有人可以向我解释一下这是如何工作的吗?还是例子有误?

谢谢

val data = Array(1,2,3,4,5)
var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

最佳答案

打个比方,上面提供的解释是绝对正确的,让我深入解释一下 ->

让我们假设我们正在一个具有单个工作节点和执行器的节点上工作,并且我们在 RDD 上使用 foreach 来计算 RDD 中的元素数量。正如我们所知,我们在单个节点上,因此数据不会被分发,并且将保持单一身份,因此计数变量(闭包 -> 这些类型的变量称为闭包)将为每个元素计数,并且此更新将是每当发生增量时,每次都会发送给执行器,然后执行器将闭包提交给驱动程序节点。

Drivernode -> executor 和 driver 将驻留在单个节点上,因此驱动节点的计数变量将在执行节点的范围内,因此将更新驱动节点计数变量值。

我们已经从驱动程序节点而非执行程序节点获得了结果计数值。

Executor -> closure -> data

现在假设我们在集群环境中工作,假设有 2 个节点和 2 个 worker 和 executor。现在数据将被分成几个部分,因此 ->

Data -> Data_1, Data_2

Drivernode -> 在不同的节点上有它的计数变量,但对 Executor 1 和 Executor 2 不可见,因为它们驻留在不同的节点上,因此 executor1 和 executor2 无法更新驱动程序节点的 count 变量

Executor1-> processing(Data_1) with closure_1
Executor2-> processing(Data_1) with closure_2

闭包 1 将更新执行器 1,因为它可序列化为执行器 1,类似地,闭包 2 将更新执行器 2

为了解决这种情况,我们像这样使用 Accumulator:

val counter=sc.accumulator(0)

关于scala - 了解 Spark 中的闭包和并行性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32945165/

相关文章:

scala - 我可以对我的 scala 代码做些什么,以便它编译得更快?

scala - 在 Spark Scala 中禁用科学计数法

scala - Scala中列表的模式匹配结束/中间

json - 如何使用 blueprint.json 文件重新配置 Ambari 服务值

使用 spark-submit 时出现 Hadoop 错误

scala - Eff monad 和 Free monad 的区别

sql - 根据 Hive 中 2 个源表的一些规则更新目标中的 "flag"

java - 如何使用 Java 有效地读取 Hadoop (HDFS) 文件中的第一行?

apache-spark - 如何将spark输出存储到rdbms数据库?

hadoop - 使用 --proxy-user、--keytab 和 --principal 参数在 hadoop kerberos 中提交 spark-submit