scala - 当我在不广播的情况下在 Scala 中使用全局 map 变量时会发生什么

标签 scala apache-spark

在scala中,当我在scala中使用全局 map 变量而不广播时会发生什么?

例如。如果我使用 collect* 得到一个变量(比如 collectAsMap ),好像是全局变量,我可以在所有 RDD.mapValues() 中使用函数而不显式广播它。

但我知道 spark 是分布式工作的,它不应该在不广播它的情况下处理全局内存存储变量。所以发生了什么事?

代码示例(此代码在文本中调用 tf-idf,其中 df 存储在 Map 中):

//dfMap is a String->int Map in memory
//Array[(String, Int)] = Array((B,2), (A,3), (C,1))
val dfMap = dfrdd.collectAsMap;
//tfrdd is a rdd, and I can use dfMap in its mapValues function
//tfrdd: Array((doc1,Map(A -> 3.0)), (doc2,Map(A -> 2.0, B -> 1.0)))
val tfidfrdd = tfrdd.mapValues( e => e.map(x => x._1 -> x._2 * lineNum / dfMap.getOrElse(x._1, 1) ) ); 
tfidfrdd.saveAsTextFile("/somedir/result/");

代码工作得很好。我的问题是那里发生了什么?驱动程序是否像广播一样将 dfMap 发送给所有工作人员?

如果我像这样明确地编码广播有什么区别:
dfMap = sc.broadcast(dfrdd.collectAsMap)
val tfidfrdd = tfrdd.mapValues( e => e.map(x => x._1 -> x._2 * lineNum / dfMap.value.getOrElse(x._1, 1) ) 

最佳答案

全局映射变量和广播变量之间没有区别。如果我们在 RDD 的 map 函数中使用全局变量,那么它将被广播到所有节点。例如:

scala> val list = List(1,2,3)
list: List[Int] = List(1, 2, 3)

scala> val rdd = sc.parallelize(List(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> rdd.filter(elem => list.contains(elem)).collect
17/03/16 10:21:53 INFO SparkContext: Starting job: collect at <console>:29
17/03/16 10:21:53 INFO DAGScheduler: Got job 3 (collect at <console>:29) with 4 output partitions
17/03/16 10:21:53 INFO DAGScheduler: Final stage: ResultStage 3 (collect at <console>:29)
17/03/16 10:21:53 INFO DAGScheduler: Parents of final stage: List()
17/03/16 10:21:53 INFO DAGScheduler: Missing parents: List()
17/03/16 10:21:53 DEBUG DAGScheduler: submitStage(ResultStage 3)
17/03/16 10:21:53 DEBUG DAGScheduler: missing: List()
17/03/16 10:21:53 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[5] at filter at <console>:29), which has no missing parents
17/03/16 10:21:53 DEBUG DAGScheduler: submitMissingTasks(ResultStage 3)
17/03/16 10:21:53 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 5.0 KB, free 366.3 MB)
17/03/16 10:21:53 DEBUG BlockManager: Put block broadcast_4 locally took  1 ms
17/03/16 10:21:53 DEBUG BlockManager: Putting block broadcast_4 without replication took  1 ms
17/03/16 10:21:53 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 2.5 KB, free 366.3 MB)
17/03/16 10:21:53 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.2.123:37645 (size: 2.5 KB, free: 366.3 MB)
17/03/16 10:21:53 DEBUG BlockManagerMaster: Updated info of block broadcast_4_piece0
17/03/16 10:21:53 DEBUG BlockManager: Told master about block broadcast_4_piece0
17/03/16 10:21:53 DEBUG BlockManager: Put block broadcast_4_piece0 locally took  2 ms
17/03/16 10:21:53 DEBUG ContextCleaner: Got cleaning task CleanBroadcast(1)
17/03/16 10:21:53 DEBUG BlockManager: Putting block broadcast_4_piece0 without replication took  2 ms
17/03/16 10:21:53 DEBUG ContextCleaner: Cleaning broadcast 1
17/03/16 10:21:53 DEBUG TorrentBroadcast: Unpersisting TorrentBroadcast 1
17/03/16 10:21:53 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:996
17/03/16 10:21:53 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 3 (MapPartitionsRDD[5] at filter at <console>:29)
17/03/16 10:21:53 DEBUG DAGScheduler: New pending partitions: Set(0, 1, 2, 3)
17/03/16 10:21:53 INFO TaskSchedulerImpl: Adding task set 3.0 with 4 tasks
17/03/16 10:21:53 DEBUG TaskSetManager: Epoch for TaskSet 3.0: 0
17/03/16 10:21:53 DEBUG TaskSetManager: Valid locality levels for TaskSet 3.0: NO_PREF, ANY
17/03/16 10:21:53 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 0
17/03/16 10:21:53 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 12, localhost, executor driver, partition 0, PROCESS_LOCAL, 5886 bytes)
17/03/16 10:21:53 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 13, localhost, executor driver, partition 1, PROCESS_LOCAL, 5886 bytes)
17/03/16 10:21:53 INFO TaskSetManager: Starting task 2.0 in stage 3.0 (TID 14, localhost, executor driver, partition 2, PROCESS_LOCAL, 5886 bytes)
17/03/16 10:21:53 INFO TaskSetManager: Starting task 3.0 in stage 3.0 (TID 15, localhost, executor driver, partition 3, PROCESS_LOCAL, 5886 bytes)
17/03/16 10:21:53 INFO Executor: Running task 0.0 in stage 3.0 (TID 12)
17/03/16 10:21:53 DEBUG Executor: Task 12's epoch is 0
17/03/16 10:21:53 DEBUG BlockManager: Getting local block broadcast_4
17/03/16 10:21:53 DEBUG BlockManager: Level for block broadcast_4 is StorageLevel(disk, memory, deserialized, 1 replicas)
17/03/16 10:21:53 INFO Executor: Running task 2.0 in stage 3.0 (TID 14)
17/03/16 10:21:53 INFO Executor: Running task 1.0 in stage 3.0 (TID 13)
17/03/16 10:21:53 DEBUG BlockManagerSlaveEndpoint: removing broadcast 1
17/03/16 10:21:53 DEBUG BlockManager: Removing broadcast 1
17/03/16 10:21:53 DEBUG BlockManager: Removing block broadcast_1
17/03/16 10:21:53 INFO Executor: Running task 3.0 in stage 3.0 (TID 15)
17/03/16 10:21:53 DEBUG Executor: Task 13's epoch is 0
17/03/16 10:21:53 DEBUG MemoryStore: Block broadcast_1 of size 5112 dropped from memory (free 384072627)
17/03/16 10:21:53 DEBUG BlockManager: Removing block broadcast_1_piece0
17/03/16 10:21:53 DEBUG MemoryStore: Block broadcast_1_piece0 of size 2535 dropped from memory (free 384075162)
17/03/16 10:21:53 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.2.123:37645 in memory (size: 2.5 KB, free: 366.3 MB)
17/03/16 10:21:53 DEBUG BlockManagerMaster: Updated info of block broadcast_1_piece0
17/03/16 10:21:53 DEBUG BlockManager: Told master about block broadcast_1_piece0
17/03/16 10:21:53 DEBUG BlockManager: Getting local block broadcast_4
17/03/16 10:21:53 DEBUG BlockManager: Level for block broadcast_4 is StorageLevel(disk, memory, deserialized, 1 replicas)
17/03/16 10:21:53 DEBUG Executor: Task 14's epoch is 0
17/03/16 10:21:53 DEBUG BlockManager: Getting local block broadcast_4
17/03/16 10:21:53 DEBUG BlockManager: Level for block broadcast_4 is StorageLevel(disk, memory, deserialized, 1 replicas)
17/03/16 10:21:53 DEBUG Executor: Task 15's epoch is 0
17/03/16 10:21:53 DEBUG BlockManager: Getting local block broadcast_4
17/03/16 10:21:53 DEBUG BlockManager: Level for block broadcast_4 is StorageLevel(disk, memory, deserialized, 1 replicas)
17/03/16 10:21:53 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 1, response is 0
17/03/16 10:21:53 DEBUG ContextCleaner: Cleaned broadcast 1
17/03/16 10:21:53 DEBUG ContextCleaner: Got cleaning task CleanBroadcast(3)
17/03/16 10:21:53 DEBUG ContextCleaner: Cleaning broadcast 3
17/03/16 10:21:53 DEBUG TorrentBroadcast: Unpersisting TorrentBroadcast 3
17/03/16 10:21:53 DEBUG BlockManagerSlaveEndpoint: removing broadcast 3
17/03/16 10:21:53 DEBUG BlockManager: Removing broadcast 3
17/03/16 10:21:53 DEBUG BlockManager: Removing block broadcast_3_piece0
17/03/16 10:21:53 DEBUG MemoryStore: Block broadcast_3_piece0 of size 3309 dropped from memory (free 384078471)
17/03/16 10:21:53 DEBUG BlockManagerSlaveEndpoint: Sent response: 0 to 192.168.2.123:40909
17/03/16 10:21:53 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.2.123:37645 in memory (size: 3.2 KB, free: 366.3 MB)
17/03/16 10:21:53 DEBUG BlockManagerMaster: Updated info of block broadcast_3_piece0
17/03/16 10:21:53 DEBUG BlockManager: Told master about block broadcast_3_piece0
17/03/16 10:21:53 DEBUG BlockManager: Removing block broadcast_3
17/03/16 10:21:53 DEBUG MemoryStore: Block broadcast_3 of size 6904 dropped from memory (free 384085375)
17/03/16 10:21:53 INFO Executor: Finished task 1.0 in stage 3.0 (TID 13). 912 bytes result sent to driver
17/03/16 10:21:53 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 3, response is 0
17/03/16 10:21:53 DEBUG BlockManagerSlaveEndpoint: Sent response: 0 to 192.168.2.123:40909
17/03/16 10:21:53 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 3
17/03/16 10:21:53 DEBUG TaskSetManager: No tasks for locality level NO_PREF, so moving to locality level ANY
17/03/16 10:21:53 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 13) in 36 ms on localhost (executor driver) (1/4)
17/03/16 10:21:53 INFO Executor: Finished task 2.0 in stage 3.0 (TID 14). 912 bytes result sent to driver
17/03/16 10:21:53 DEBUG ContextCleaner: Cleaned broadcast 3
17/03/16 10:21:53 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 2
17/03/16 10:21:53 INFO Executor: Finished task 0.0 in stage 3.0 (TID 12). 912 bytes result sent to driver
17/03/16 10:21:53 INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID 14) in 36 ms on localhost (executor driver) (2/4)
17/03/16 10:21:53 INFO Executor: Finished task 3.0 in stage 3.0 (TID 15). 908 bytes result sent to driver
17/03/16 10:21:53 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 1
17/03/16 10:21:53 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 0
17/03/16 10:21:53 INFO TaskSetManager: Finished task 3.0 in stage 3.0 (TID 15) in 36 ms on localhost (executor driver) (3/4)
17/03/16 10:21:53 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 12) in 45 ms on localhost (executor driver) (4/4)
17/03/16 10:21:53 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
17/03/16 10:21:53 INFO DAGScheduler: ResultStage 3 (collect at <console>:29) finished in 0.045 s
17/03/16 10:21:53 DEBUG DAGScheduler: After removal of stage 3, remaining stages = 0
17/03/16 10:21:53 INFO DAGScheduler: Job 3 finished: collect at <console>:29, took 0.097564 s
res4: Array[Int] = Array(1, 2, 3)

在上面的日志中我们可以清楚地看到全局变量list被广播。所以,当我们明确地广播 list 时就是这种情况。 .
scala> val br = sc.broadcast(list)
17/03/16 10:26:40 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 160.0 B, free 366.3 MB)
17/03/16 10:26:40 DEBUG BlockManager: Put block broadcast_5 locally took  1 ms
17/03/16 10:26:40 DEBUG BlockManager: Putting block broadcast_5 without replication took  1 ms
17/03/16 10:26:40 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 227.0 B, free 366.3 MB)
17/03/16 10:26:40 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.2.123:37645 (size: 227.0 B, free: 366.3 MB)
17/03/16 10:26:40 DEBUG BlockManagerMaster: Updated info of block broadcast_5_piece0
17/03/16 10:26:40 DEBUG BlockManager: Told master about block broadcast_5_piece0
17/03/16 10:26:40 DEBUG BlockManager: Put block broadcast_5_piece0 locally took  1 ms
17/03/16 10:26:40 DEBUG BlockManager: Putting block broadcast_5_piece0 without replication took  1 ms
17/03/16 10:26:40 INFO SparkContext: Created broadcast 5 from broadcast at <console>:26
br: org.apache.spark.broadcast.Broadcast[List[Int]] = Broadcast(5)

scala> rdd.filter(elem => br.value.contains(elem)).collect
17/03/16 10:27:50 INFO SparkContext: Starting job: collect at <console>:31
17/03/16 10:27:50 INFO DAGScheduler: Got job 0 (collect at <console>:31) with 4 output partitions
17/03/16 10:27:50 INFO DAGScheduler: Final stage: ResultStage 0 (collect at <console>:31)
17/03/16 10:27:50 INFO DAGScheduler: Parents of final stage: List()
17/03/16 10:27:50 INFO DAGScheduler: Missing parents: List()
17/03/16 10:27:50 DEBUG DAGScheduler: submitStage(ResultStage 0)
17/03/16 10:27:50 DEBUG DAGScheduler: missing: List()
17/03/16 10:27:50 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at filter at <console>:31), which has no missing parents
17/03/16 10:27:50 DEBUG DAGScheduler: submitMissingTasks(ResultStage 0)
17/03/16 10:27:50 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.7 KB, free 366.3 MB)
17/03/16 10:27:50 DEBUG BlockManager: Put block broadcast_1 locally took  6 ms
17/03/16 10:27:50 DEBUG BlockManager: Putting block broadcast_1 without replication took  6 ms
17/03/16 10:27:50 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.2 KB, free 366.3 MB)
17/03/16 10:27:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.2.123:37303 (size: 3.2 KB, free: 366.3 MB)
17/03/16 10:27:50 DEBUG BlockManagerMaster: Updated info of block broadcast_1_piece0
17/03/16 10:27:50 DEBUG BlockManager: Told master about block broadcast_1_piece0
17/03/16 10:27:50 DEBUG BlockManager: Put block broadcast_1_piece0 locally took  2 ms
17/03/16 10:27:50 DEBUG BlockManager: Putting block broadcast_1_piece0 without replication took  2 ms
17/03/16 10:27:50 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:996
17/03/16 10:27:50 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at filter at <console>:31)
17/03/16 10:27:50 DEBUG DAGScheduler: New pending partitions: Set(0, 1, 2, 3)
17/03/16 10:27:50 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
17/03/16 10:27:50 DEBUG TaskSetManager: Epoch for TaskSet 0.0: 0
17/03/16 10:27:50 DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: NO_PREF, ANY
17/03/16 10:27:50 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0
17/03/16 10:27:50 DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: NO_PREF, ANY
17/03/16 10:27:51 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5885 bytes)
17/03/16 10:27:51 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 5885 bytes)
17/03/16 10:27:51 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, PROCESS_LOCAL, 5885 bytes)
17/03/16 10:27:51 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, executor driver, partition 3, PROCESS_LOCAL, 5885 bytes)
17/03/16 10:27:51 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/03/16 10:27:51 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/03/16 10:27:51 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
17/03/16 10:27:51 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
17/03/16 10:27:51 DEBUG Executor: Task 0's epoch is 0
17/03/16 10:27:51 DEBUG Executor: Task 2's epoch is 0
17/03/16 10:27:51 DEBUG Executor: Task 3's epoch is 0
17/03/16 10:27:51 DEBUG Executor: Task 1's epoch is 0
17/03/16 10:27:51 DEBUG BlockManager: Getting local block broadcast_1
17/03/16 10:27:51 DEBUG BlockManager: Level for block broadcast_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
17/03/16 10:27:51 DEBUG BlockManager: Getting local block broadcast_1
17/03/16 10:27:51 DEBUG BlockManager: Level for block broadcast_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
17/03/16 10:27:51 DEBUG BlockManager: Getting local block broadcast_1
17/03/16 10:27:51 DEBUG BlockManager: Level for block broadcast_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
17/03/16 10:27:51 DEBUG BlockManager: Getting local block broadcast_1
17/03/16 10:27:51 DEBUG BlockManager: Level for block broadcast_1 is StorageLevel(disk, memory, deserialized, 1 replicas)
17/03/16 10:27:51 DEBUG BlockManager: Getting local block broadcast_0
17/03/16 10:27:51 DEBUG BlockManager: Level for block broadcast_0 is StorageLevel(disk, memory, deserialized, 1 replicas)
17/03/16 10:27:51 DEBUG BlockManager: Getting local block broadcast_0
17/03/16 10:27:51 DEBUG BlockManager: Level for block broadcast_0 is StorageLevel(disk, memory, deserialized, 1 replicas)
17/03/16 10:27:51 DEBUG BlockManager: Getting local block broadcast_0
17/03/16 10:27:51 DEBUG BlockManager: Level for block broadcast_0 is StorageLevel(disk, memory, deserialized, 1 replicas)
17/03/16 10:27:51 DEBUG BlockManager: Getting local block broadcast_0
17/03/16 10:27:51 DEBUG BlockManager: Level for block broadcast_0 is StorageLevel(disk, memory, deserialized, 1 replicas)
17/03/16 10:27:51 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 908 bytes result sent to driver
17/03/16 10:27:51 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 999 bytes result sent to driver
17/03/16 10:27:51 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 912 bytes result sent to driver
17/03/16 10:27:51 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 912 bytes result sent to driver
17/03/16 10:27:51 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 3
17/03/16 10:27:51 DEBUG TaskSetManager: No tasks for locality level NO_PREF, so moving to locality level ANY
17/03/16 10:27:51 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 2
17/03/16 10:27:51 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1
17/03/16 10:27:51 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0
17/03/16 10:27:51 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 165 ms on localhost (executor driver) (1/4)
17/03/16 10:27:51 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 180 ms on localhost (executor driver) (2/4)
17/03/16 10:27:51 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 249 ms on localhost (executor driver) (3/4)
17/03/16 10:27:51 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 186 ms on localhost (executor driver) (4/4)
17/03/16 10:27:51 INFO DAGScheduler: ResultStage 0 (collect at <console>:31) finished in 0.264 s
17/03/16 10:27:51 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/03/16 10:27:51 DEBUG DAGScheduler: After removal of stage 0, remaining stages = 0
17/03/16 10:27:51 INFO DAGScheduler: Job 0 finished: collect at <console>:31, took 0.381615 s
res1: Array[Int] = Array(1, 2, 3)

广播变量的情况也是如此。

关于scala - 当我在不广播的情况下在 Scala 中使用全局 map 变量时会发生什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42825166/

相关文章:

scala - 有没有像 "Unbounded Self Type"这样的东西?

scala构造函数-局部变量再次

scala - 如何根据user_id分区的其他列值计算行的差异

scala - ~ 在 Scala 中做什么?

scala - 在akka中输入消息

python - PySpark:将 python 列表中的元素添加到 spark.sql() 语句中

scala - 使用spark将数据写入cassandra

java - 如何将 MLlib Apache Spark 库安装到 JAVA Eclpise 项目中?

python - 如何模拟对 pyspark sql 函数的内部调用

Scala 默认参数值派生自以前的参数