python - 访问 Spark RDD 时闭包中局部变量的使用

标签 python apache-spark pyspark closures rdd

我有一个关于访问 Spark RDD 时闭包中局部变量的使用的问题。我想解决的问题如下:

我有一个应该读入 RDD 的文本文件列表。 但是,首先我需要向从单个文本文件创建的 RDD 添加附加信息。此附加信息是从文件名中提取的。然后,使用 union() 将 RDD 放入一个大 RDD。

from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
spark_context = SparkContext(conf=spark_conf)

list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt']
rdd_list = []
for filename in list_of_filenames:
    tmp_rdd = spark_context.textFile(filename)
    # extract_file_info('file_from_Owner.txt') == 'Owner'
    file_owner = extract_file_info(filename)   
    tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
    rdd_list.append(tmp_rdd)
overall_content_rdd = spark_context.union(rdd_list)
# ...do something...
overall_content_rdd.collect()
# However, this does not work: 
# The result is that always Bert will be the owner, i.e., never Ernie.

问题是循环中的map()函数没有引用“正确的”file_owner。相反,它将引用 file_owner 的最新值。在我的本地计算机上,我通过为每个 RDD 调用 cache() 函数来解决这个问题:

# ..
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
tmp_rdd.cache()
# ..

我的问题:使用cache() 是解决我的问题的正确方法吗?有其他选择吗?

非常感谢!

最佳答案

这不是 Spark 现象,而是普通的 Python 现象。

>>> fns = []
>>> for i in range(3):
...   fns.append(lambda: i)
... 
>>> for fn in fns:
...   print fn()
... 
2
2
2

避免这种情况的一种方法是声明具有默认参数的函数。默认值在声明时计算。

>>> fns = []
>>> for i in range(3):
...   def f(i=i):
...     return i
...   fns.append(f)
... 
>>> for fn in fns:
...   print fn()
... 
0
1
2

这个问题经常出现,请参阅以下其他问题:

关于python - 访问 Spark RDD 时闭包中局部变量的使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28204035/

相关文章:

python - 在 Perl 中使用 Spacy 和 Inline::Python 对多 MB 的原始文本进行词形还原。为什么这么慢?

python - 如何在 Python 中导入 r 包

java - 将 python lambda 转换为 scala 或 java。使用 py4j 可以吗?

mysql - Spark 存在错误时丢弃 Hive 表

python - Spark 作业未结束 : Show of dataframe

python - 如何将字典列表转换为 Pyspark DataFrame

python - 如何在jupyter PySpark session 中更改SparkContext属性spark.sql.pivotMaxValues

python - 将 lxml.objectify 的元素转回 XML

apache-spark - 将 Apache Zeppelin notebook 集成到 Web 应用程序中

每个线程使用不同设置的 Python 多线程