apache-spark - 更新 RDD 中的广播变量

标签 apache-spark pyspark rdd

我已经广播了这个 RDD。

test = sc.parallelize([(1),(2),(3),(4)]).zipWithIndex().map(lambda x: (x[1],x[0]))
return :
{0: 1, 1: 2, 2: 3, 3: 4}

我有另一个 RDD,它是元组列表:

tuples=sc.parallelize([(0,1),(1,2),(3,2)])

我的目标是使用元组作为广播变量的键并将其值更新为 1

因此,对于元组 (0,1),我的新广播变量将是。

{0: 2, 1: 3, 2: 3, 3: 4}

对于元组 (1,2)

{0: 2, 1: 4, 2: 4, 3: 4}

对于元组 (3,2)

{0: 2, 1: 4, 2: 5, 3: 5}

并返回最后更新广播的变量{0: 2, 1: 4, 2: 5, 3: 5}

我尝试对其进行编码,但结果不佳,对于每个元组,它都会增加一,但没有考虑最后的结果。

def modify_broadcast(j,test):
  main=j[0]
  context=j[1]
  test.value[main]=test.value[main]+1
  test.value[context]=test.value[context]+1
  return test.value

test = sc.parallelize([(1),(2),(3),(4)]).zipWithIndex().map(lambda x: (x[1],x[0]))
test = sc.broadcast(test.collectAsMap())


print(test.value[0])
coocurence = sc.parallelize([(0,1),(1,2),(3,2)]).map(lambda x: modify_broadcast(x,test))

最佳答案

当你广播时,它就像共享变量。您可以将其用作查找值并将其视为只读。根据我的了解,每个工作节点都将拥有该变量的本地副本,并将更新其自己的副本。这不会反射(reflect)到其他工作节点,因为它们仅传递到每个节点一次。

摘自《学习 Spark 》一书:

A broadcast variable is simply an object of type spark.broadcast.Broadcast[T], which wraps a value of type T. We can access this value by calling value on the Broadcast object in our tasks. The value is sent to each node only once, using an efficient, BitTorrent-like communication mechanism.

The process of using broadcast variables is simple: 1. Create a Broadcast[T] by calling SparkContext.broadcast on an object of type T. Any type works as long as it is also Serializable. 2. Access its value with the value property (or value() method in Java). 3. The variable will be sent to each node only once, and should be treated as read- only (updates will not be propagated to other nodes).

关于apache-spark - 更新 RDD 中的广播变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42694312/

相关文章:

scala - Spark 对 RDD 进行排序并加入他们的行列

pyspark - 使用 Synapse Analytics 将数据帧写入 SQL 专用数据库

python - Pyspark:将 RDD 转换为 RowMatrix

azure - Spark for kubernetes - Azure Blob 存储凭据问题

java - Spark : Saving filtered rows in FilterFunction

scala - 将文件名哈希附加到 Spark RDD 的每一行

apache-spark - 在将 PySpark 作业提交到 Google Dataproc 时从 requirements.txt 初始化虚拟环境

python - 如何将 PySpark 数据帧的每个非字符串列与浮点常量相除或相乘?

java - 如何在 Spark Java 中创建复杂的 StructType 架构

apache-spark - Spark : difference when read in . gz 和 .bz2