我使用 py.test
运行以下文件(名为 test_func.py
):
import findspark
findspark.init()
from pyspark.context import SparkContext
def filtering(data):
return data.map(lambda p: modif(p)).count()
def modif(row):
row.split(",")
class Test(object):
sc = SparkContext('local[1]')
def test_filtering(self):
data = self.sc.parallelize(['1','2', ''])
assert filtering(data) == 2
并且,由于 modif
函数在 map
转换内部使用,因此会失败并出现以下错误:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
ImportError: No module named clustering.test_func
pyspark 无法找到 modif
函数。请注意,文件 test_func.py
位于目录 clustering
中,我从 clustering
目录中运行 py.test
.
令我惊讶的是,如果我在 map
之外使用 modif
函数,它工作得很好。例如,如果我这样做: modif(data.first())
知道为什么我会遇到这样的导入错误以及如何修复它吗?
<小时/>编辑
- 我已经测试了 Avihoo Mamka 的答案,即将
test_func.py
添加到复制到所有工作人员的文件中。然而,并没有什么效果。这对我来说并不奇怪,因为我认为创建 Spark 应用程序的主文件总是发送给所有工作人员。 - 我认为这可能来自于
pyspark
正在寻找clustering.test_func
而不是test_func
。
最佳答案
这里的关键是你得到的Traceback
。
PySpark 告诉您工作进程无权访问 clustering.test_func.py
。当您初始化 SparkContext
时,您可以传递应复制到工作线程的文件列表:
sc = SparkContext("local[1]", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])
更多信息:https://spark.apache.org/docs/1.5.2/programming-guide.html
关于python - Pyspark - 在 map 转换中使用自定义函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37673462/