我已经广播了这个 RDD。
test = sc.parallelize([(1),(2),(3),(4)]).zipWithIndex().map(lambda x: (x[1],x[0]))
return :
{0: 1, 1: 2, 2: 3, 3: 4}
我有另一个 RDD,它是元组列表:
tuples=sc.parallelize([(0,1),(1,2),(3,2)])
我的目标是使用元组作为广播变量的键并将其值更新为 1
因此,对于元组 (0,1),我的新广播变量将是。
{0: 2, 1: 3, 2: 3, 3: 4}
对于元组 (1,2)
{0: 2, 1: 4, 2: 4, 3: 4}
对于元组 (3,2)
{0: 2, 1: 4, 2: 5, 3: 5}
并返回最后更新广播的变量{0: 2, 1: 4, 2: 5, 3: 5}
我尝试对其进行编码,但结果不佳,对于每个元组,它都会增加一,但没有考虑最后的结果。
def modify_broadcast(j,test):
main=j[0]
context=j[1]
test.value[main]=test.value[main]+1
test.value[context]=test.value[context]+1
return test.value
test = sc.parallelize([(1),(2),(3),(4)]).zipWithIndex().map(lambda x: (x[1],x[0]))
test = sc.broadcast(test.collectAsMap())
print(test.value[0])
coocurence = sc.parallelize([(0,1),(1,2),(3,2)]).map(lambda x: modify_broadcast(x,test))
最佳答案
当你广播时,它就像共享变量。您可以将其用作查找值并将其视为只读。根据我的了解,每个工作节点都将拥有该变量的本地副本,并将更新其自己的副本。这不会反射(reflect)到其他工作节点,因为它们仅传递到每个节点一次。
摘自《学习 Spark 》一书:
A broadcast variable is simply an object of type spark.broadcast.Broadcast[T], which wraps a value of type T. We can access this value by calling value on the Broadcast object in our tasks. The value is sent to each node only once, using an efficient, BitTorrent-like communication mechanism.
The process of using broadcast variables is simple: 1. Create a Broadcast[T] by calling SparkContext.broadcast on an object of type T. Any type works as long as it is also Serializable. 2. Access its value with the value property (or value() method in Java). 3. The variable will be sent to each node only once, and should be treated as read- only (updates will not be propagated to other nodes).
关于apache-spark - 更新 RDD 中的广播变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42694312/