python - ModuleNotFoundError 因为 PySpark 序列化程序无法找到库文件夹

标签 python apache-spark pyspark google-cloud-dataproc

我有以下文件夹结构

 - libfolder
    - lib1.py
    - lib2.py
 - main.py
main.py电话libfolder.lib1.py然后调用 libfolder.lib2.py和别的。

在本地机器上一切正常,但在我将其部署到 Dataproc 后,出现以下错误
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 455, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'libfolder'

我已将文件夹压缩到 xyz.zip并运行以下命令:
spark-submit --py-files=xyz.zip main.py

序列化程序无法找到 libfolder 的位置.我打包文件夹的方式有问题吗?

此问题类似于 this one但它没有回答。

编辑:对伊戈尔问题的回应

zip 文件的 unzip -l 返回以下内容
 - libfolder
    - lib1.py
    - lib2.py
 - main.py

在 main.py 中使用此导入语句调用 lib1.py
from libfolder import lib1

最佳答案

这对我有用:

$ cat main.py

from pyspark import SparkContext, SparkConf

from subpkg import sub

conf = SparkConf().setAppName("Shell Count")
sc = SparkContext(conf = conf)

text_file = sc.textFile("file:///etc/passwd")
counts = text_file.map(lambda line: sub.map(line)) \
    .map(lambda shell: (shell, 1)) \
    .reduceByKey(lambda a, b: sub.reduce(a, b))

counts.saveAsTextFile("hdfs:///count5.txt")

$ cat subpkg/sub.py

def map(line):
  return line.split(":")[6]

def reduce(a, b):
  return a + b

$ unzip -l /tmp/deps.zip 
Archive:  /tmp/deps.zip
  Length      Date    Time    Name
---------  ---------- -----   ----
        0  2019-01-07 14:22   subpkg/
        0  2019-01-07 13:51   subpkg/__init__.py
       79  2019-01-07 14:13   subpkg/sub.py
---------                     -------
       79                     3 files


$ gcloud dataproc jobs submit pyspark --cluster test-cluster main.py --py-files deps.zip
Job [1f0f15108a4149c5942f49513ce04440] submitted.
Waiting for job output...
Hello world!
Job [1f0f15108a4149c5942f49513ce04440] finished successfully.

关于python - ModuleNotFoundError 因为 PySpark 序列化程序无法找到库文件夹,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53863576/

相关文章:

python - django.db.utils.IntegrityError : FOREIGN KEY constraint failed

Python:WebKit.WebView:如何在错误时重新加载?

scala - 修改了spark中的countByKey

apache-spark - 如何将流式数据集写入 Cassandra?

python - 执行动态 python 函数以响应 HTTP 调用

python - 使用 lxml、xpath 在 Python 中解析 HTML

mysql - Spark : Exception in thread "main" java. lang.ClassNotFoundException : com. mysql.jdbc.Driver

apache-spark - 来自 RDD 的 PySpark LDA 模型密集向量

python - 如何在 PySpark 中从年、月和日创建日期?

apache-spark - Spark 在运行期间用千兆字节的小文件填充临时目录