python - rdd.collect() 上 PySpark Worker 中的 ModuleNotFoundError

标签 python apache-spark pyspark apache-spark-sql

我在 python 中运行 Apache Spark 程序,我收到一个我无法理解且无法开始调试的错误。我有一个驱动程序,它在名为 hound.py 的文件中定义了一个名为 hound 的函数。在同一目录中,我有一个名为 hound_base.py 的文件,它定义了一个名为 hound_base_func 的函数。所以为了在猎犬中调用它,我导入“from hound_base import hound_base_func”。这有效,我调用该函数并传递一个 Spark 数据帧。 hound_base_func 将其作为参数,对其底层的 rdd 做一些工作,并调用 rdd.collect()。这实际上使代码崩溃,并显示错误消息“ModuleNotFoundError:没有名为‘hound_base’的模块”,这毫无意义!它是说找不到代码实际执行的模块。愿意提供尽可能多的详细信息,但这就是我所知道的与问题相关的所有信息...关于我如何解决这个问题有什么提示吗?

完整跟踪

2018-06-14 14:29:26 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 2)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in main
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_command
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
2018-06-14 14:29:26 WARN  TaskSetManager:66 - Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in main
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_command
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

2018-06-14 14:29:26 ERROR TaskSetManager:70 - Task 0 in stage 2.0 failed 1 times; aborting job
[Stage 2:>                                                          (0 + 1) / 1]Traceback (most recent call last):
  File "F:\data\src\hound.py", line 43, in <module>
    hound("fakedata.csv", "Field1", "Field2", "Field3", ["Field4a", "Field4b"])
  File "F:\data\src\hound.py", line 37, in hound
    hound_base_func(data)
  File "F:\data\src\hound_base.py", line 220, in hound_base_func
    rdd_collected = rdd_result.collect()
  File "C:\Users\Brian\Miniconda3\lib\site-packages\pyspark\rdd.py", line 824, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "C:\Users\Brian\Miniconda3\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\Brian\Miniconda3\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\Brian\Miniconda3\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in main
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_command
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'


    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)

    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)

    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)

    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)

    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

    at java.lang.Thread.run(Unknown Source)


Driver stacktrace:

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)

    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

    at scala.Option.foreach(Option.scala:257)

    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)

    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)

    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)

    at org.apache.spark.rdd.RDD.collect(RDD.scala:938)

    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)

    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

    at java.lang.reflect.Method.invoke(Unknown Source)

    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

    at py4j.Gateway.invoke(Gateway.java:282)

    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

    at py4j.commands.CallCommand.execute(CallCommand.java:79)

    at py4j.GatewayConnection.run(GatewayConnection.java:214)

    at java.lang.Thread.run(Unknown Source)

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in main
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_command
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'


    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)

    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)

    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)

    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)

    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

    ... 1 more


SUCCESS: The process with PID 18960 (child process of PID 6380) has been terminated.
SUCCESS: The process with PID 6380 (child process of PID 1400) has been terminated.
SUCCESS: The process with PID 1400 (child process of PID 19344) has been terminated.
[Finished in 21.811s]

最佳答案

这里有多个问题:

首先,您不能从执行程序任务访问 spark 上下文,即从 rdd.map() 中的任何函数。

其次,在 .map 的 lambda 函数内部使用外部函数很棘手。一种解决方案是尽可能将所有函数定义移动到原始函数中。如果有任何文件在不同的文件中,您必须使用 spark_context.addPyFile(path) 显式添加该文件,因为在驱动程序中导入是不够的。

这些东西解决了我遇到的这个错误的(许多)问题。请注意,由于惰性评估,它只会在 .collect() 上被抛出。不好玩。

关于python - rdd.collect() 上 PySpark Worker 中的 ModuleNotFoundError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50865093/

相关文章:

python - 对 PySpark Dataframe 中的连续行进行分组

apache-spark - Spark Streaming Kafka - 如何在处理所有现有消息后停止流式传输(优雅地)

python - 垃圾/ python : Replace empty string

用于输出可由音频程序读取的 MIDI 文件或文本的 Python 接口(interface)

scala - 将元组列表作为参数传递给scala中的spark udf

scala - row.getDouble 中的 Spark scala 参数

apache-spark - Spark DataFrames with Parquet and Partitioning

python - 为什么 Python 对于一个简单的 for 循环来说这么慢?

python - 该代码片段在 tensorflow 代码中表示 "tf.logging.set_verbosity(tf.logging.INFO)"是什么意思?

pyspark - Dataproc 上的 Spark 流数据管道遇到突然频繁的套接字超时