我看到的所有 Spark 广播变量示例都在使用它们的函数范围内定义它们( map()
、 join()
等)。我想同时使用 map()
功能和mapPartitions()
引用广播变量的函数,但我想将它们模块化,以便可以将相同的函数用于单元测试。
我的一个想法是对函数进行柯里化(Currying),以便在使用
map
时传递对广播变量的引用。或 mapPartitions
称呼。我有这样的想法(伪代码):
// firstFile.scala
// ---------------
def mapper(bcast: Broadcast)(row: SomeRow): Int = {
bcast.value(row._1)
}
def mapMyPartition(bcast: Broadcast)(iter: Iterator): Iterator {
val broadcastVariable = bcast.value
for {
i <- iter
} yield broadcastVariable(i)
})
// secondFile.scala
// ----------------
import firstFile.{mapMyPartition, mapper}
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3))
rdd
.map(mapper(bcastVariable))
.mapPartitions(mapMyPartition(bcastVariable))
最佳答案
您的解决方案应该可以正常工作。在这两种情况下,函数都传递给 map{Partitions}
序列化时将包含对广播变量本身的引用,但不包含对其值的引用,并且仅调用 bcast.value
在节点上计算时。
需要避免的是
def mapper(bcast: Broadcast): SomeRow => Int = {
val value = bcast.value
row => value(row._1)
}
关于scala - 如何引用范围之外的 Spark 广播变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36849204/