scala - 修改 Spark RDD foreach 中的集合

标签 scala apache-spark rdd

我试图在迭代 RDD 的元素时向 map 添加元素。我没有收到任何错误,但没有进行修改。

直接添加或迭代其他集合都可以正常工作:

scala> val myMap = new collection.mutable.HashMap[String,String]
myMap: scala.collection.mutable.HashMap[String,String] = Map()

scala> myMap("test1")="test1"

scala> myMap
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1)

scala> List("test2", "test3").foreach(w => myMap(w) = w)

scala> myMap
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

但是当我尝试从 RDD 做同样的事情时:
scala> val fromFile = sc.textFile("tests.txt")
...
scala> fromFile.take(3)
...
res48: Array[String] = Array(test4, test5, test6)

scala> fromFile.foreach(w => myMap(w) = w)
scala> myMap
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

我尝试像在 foreach 之前一样打印 map 的内容,以确保变量相同,并且打印正确:
fromFile.foreach(w => println(myMap("test1")))
...
test1
test1
test1
...

我还在 foreach 代码中打印了 map 的修改元素,它打印为已修改,但是当操作完成时, map 似乎未修改。
scala> fromFile.foreach({w => myMap(w) = w; println(myMap(w))})
...
test4
test5
test6
...
scala> myMap
res55: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

将 RDD 转换为数组(收集)也可以正常工作:
fromFile.collect.foreach(w => myMap(w) = w)
scala> myMap
res89: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test5 -> test5, test1 -> test1, test4 -> test4, test6 -> test6, test3 -> test3)

这是上下文问题吗?我是否正在访问正在其他地方修改的数据的副本?

最佳答案

在 Spark 集群(不是单台机器)上运行时,它变得更加清晰。 RDD 现在分布在多台机器上。当您调用 foreach ,你告诉每台机器如何处理它拥有的RDD。如果您引用任何局部变量(如 myMap ),它们会被序列化并发送到机器,因此它们可以使用它。但是什么都没有回来。所以你的 myMap 的原件不受影响。

我认为这回答了您的问题,但显然您正在尝试完成某些事情,而您将无法以这种方式到达那里。随时在此处或在单独的问题中解释您要做什么,我会尽力提供帮助。

关于scala - 修改 Spark RDD foreach 中的集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23394286/

相关文章:

Scala Intellij - Op-Rabbit 语法突出显示问题

scala - 为什么在返回语句后没有死代码警告?

java - Spark javardd 方法collect() 和collectAsync() 之间有什么区别?

python - 在 Spark : Executor lost 中获取错误

python - 有没有什么有效的方法可以将一个大列表的 RDD 分成几个列表而不执行收集

scala - 从 Spark 读取 Hive 表作为数据集

scala - 在 Scala 中指定 lambda 返回类型

python - 为什么 python dataFrames' 只位于同一台机器上?

apache-spark - Spark 结构化流式处理 Kafka 微批处理计数

apache-spark - 从 Eclipse 和 Spark Context 将 Spark 应用程序作为 yarn 作业提交