python - Pyspark - 在 map 转换中使用自定义函数

标签 python apache-spark pyspark

我使用 py.test 运行以下文件(名为 test_func.py):

import findspark
findspark.init()
from pyspark.context import SparkContext

def filtering(data):
    return data.map(lambda p: modif(p)).count()

def modif(row):
    row.split(",")   

class Test(object):
    sc = SparkContext('local[1]')

    def test_filtering(self):
        data = self.sc.parallelize(['1','2', ''])
        assert filtering(data) == 2

并且,由于 modif 函数在 map 转换内部使用,因此会失败并出现以下错误:

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
ImportError: No module named clustering.test_func

pyspark 无法找到 modif 函数。请注意,文件 test_func.py 位于目录 clustering 中,我从 clustering 目录中运行 py.test .

令我惊讶的是,如果我在 map 之外使用 modif 函数,它工作得很好。例如,如果我这样做: modif(data.first())

知道为什么我会遇到这样的导入错误以及如何修复它吗?

<小时/>

编辑

  1. 我已经测试了 Avihoo Mamka 的答案,即将 test_func.py 添加到复制到所有工作人员的文件中。然而,并没有什么效果。这对我来说并不奇怪,因为我认为创建 Spark 应用程序的主文件总是发送给所有工作人员。
  2. 我认为这可能来自于 pyspark 正在寻找 clustering.test_func 而不是 test_func

最佳答案

这里的关键是你得到的Traceback

PySpark 告诉您工作进程无权访问 clustering.test_func.py。当您初始化 SparkContext 时,您可以传递应复制到工作线程的文件列表:

sc = SparkContext("local[1]", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])

更多信息:https://spark.apache.org/docs/1.5.2/programming-guide.html

关于python - Pyspark - 在 map 转换中使用自定义函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37673462/

相关文章:

python - 无法在 Docker 上托管语音识别和声音播放应用程序

python-3.x - 如何在pyspark函数中使用全局变量

python - 绘制 Spark 数据框而不将其转换为 Pandas 的方法

python - 如何防止用户通过 pip 安装包

python - 绑定(bind)到 wxpython 小部件的调整大小事件的函数在调整大小时被多次调用

hadoop - 如何将位于 HDFS 上的类型安全配置文件添加到 spark-submit(集群模式)?

java - 为什么使用单例来包装广播变量?

pandas - 使用 Apache Arrow 将 PySpark DataFrame 转换为 Pandas

python - 在 Spark 中创建分箱直方图

python - Unresolved reference : 'django' error in PyCharm