python - 广播变量和mapPartitions

标签 python apache-spark pyspark

上下文

pySpark中,我使用以下代码将变量广播到所有节点:

sc = spark.sparkContext # Get context

# Extract stopwords from a file in hdfs
# The result looks like stopwords = {"and", "foo", "bar" ... }
stopwords = set([line[0] for line in csv.reader(open(SparkFiles.get("stopwords.txt"), 'r'))])

# The set of stopwords is broadcasted now
stopwords = sc.broadcast(stopwords)

广播stopwords后,我想让它在mapPartitions中访问:

# Some dummy-dataframe
df = spark.createDataFrame([(["TESTA and TESTB"], ), (["TESTB and TESTA"], )], ["text"])


# The method which will be applied to mapPartitions
def stopwordRemoval(partition, passed_broadcast):
    """
    Removes stopwords from "text"-column.

    @partition: iterator-object of partition.
    @passed_stopwords: Lookup-table for stopwords.
    """

    # Now the broadcast is passed
    passed_stopwords = passed_broadcast.value

    for row in partition:
        yield [" ".join((word for word in row["text"].split(" ") if word not in passed_stopwords))]


# re-partitioning in order to get mapPartitions working
df = df.repartition(2)

# Now apply the method
df = df.select("text").rdd \
        .mapPartitions(lambda partition: stopwordRemoval(partition, stopwords)) \
        .toDF()

# Result
df.show()

#Result:
+------------+
| text       |
+------------+
|TESTA TESTB |
|TESTB TESTA |
+------------+


问题

尽管它有效,但我不太确定这是否是广播变量的正确用法。所以我的问题是:

  1. 当我以演示的方式将广播传递给 mapParitions 时,广播是否正确执行?
  2. mapParitions 中使用广播是否有用,因为无论如何,停用词都会随该函数分发到所有节点(停用词永远不会重用)?

第二个问题涉及this question这部分回答了我自己的问题。无论如何,具体情况有所不同;这就是为什么我也选择问这个问题。

最佳答案

过了一段时间,我阅读了一些附加信息,这些信息回答了我的问题。因此,我想分享我的见解。


问题1: 当我将广播传递给 mapParitions 时,广播是否正确执行?以演示的方式?

首先值得注意的是 SparkContext.broadcast()是要广播为 can be read in the docs 的变量的包装器。该包装器序列化变量并将信息添加到执行图中,以将此序列化形式分布到节点上。调用广播.value -argument 是使用时再次反序列化变量的命令。 此外,文档指出:

After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v [the variable] is not shipped to the nodes more than once.

其次,我发现多个消息来源表明这适用于 UDF s(用户定义函数),例如heremapPartitions()udf() s 应被视为相似,因为在 pySpark 的情况下它们都是相似的,将数据传递给各个节点上的Python实例。

关于这一点,这里是重要的部分:反序列化必须是 Python 函数( udf() 或传递给 mapPartitions() 的任何函数)本身的一部分,这意味着它的 .value参数不得作为函数参数传递。

因此,广播以正确的方式完成:广播包装器作为参数传递,变量在 stopwordRemoval() 内反序列化。 .


问题 2: 是否在 mapParitions 内使用广播有用,因为无论如何,停用词都会随函数一起分发到所有节点(停用词永远不会重用)?

documented只有当序列化为手头的任务带来任何值(value)时,才有优势。

The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

当您有大量引用要广播到集群时,可能会出现这种情况:

[...] to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

如果这适用于您的广播,则广播具有优势。

关于python - 广播变量和mapPartitions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60384968/

相关文章:

apache-spark - 使用 prometheus JMX 代理从在单个节点上运行的多个 spark executor 导出指标

apache-spark - 具有高内存的 Spark - 每个节点使用多个执行程序?

apache-spark - 如何将正在运行的 Id 新列添加到 Spark Dataframe ( pyspark)

amazon-web-services - 无法在 AWS Glue PySpark Dev Endpoint 中正确运行脚本

python - 安装 scikits.samplerate 失败

python - 无法从 Cython 重定向错误流

python - Pandas - 度假村柱位置

Python struct.pack()/calcsize()

apache-spark - 如何在Spark-Submit中使用--num-executors选项?

python - 无法推断类型 : <type 'str' > 的架构