python - 集群上的 Pandas 和 Spark

标签 python pandas apache-spark

我有一个脚本可以解析二进制文件并将其数据作为 pandas DataFrames 返回。当我在没有集群的情况下运行脚本时,它工作正常:

sc = SparkContext('local', "TDMS parser")

但是当我尝试将 master 设置为我的本地集群(我之前已启动并将工作人员附加到它)时:

sc = SparkContext('spark://roman-pc:7077', "TDMS parser")

它记录了这样的错误

> 15/07/03 16:36:20 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID
> 0, 192.168.0.193): org.apache.spark.api.python.PythonException:
> Traceback (most recent call last):   File
> "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py",
> line 98, in main
>     command = pickleSer._read_with_length(infile)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 164, in _read_with_length
>     return self.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 421, in loads
>     return pickle.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 629, in subimport
>     __import__(name) ImportError: ('No module named pandas', <function subimport at 0x7fef3731cd70>, ('pandas',))
> 
>   at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
>   at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
>   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)     at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)    at
> org.apache.spark.scheduler.Task.run(Task.scala:70)    at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 
> 15/07/03 16:36:20 INFO TaskSetManager: Lost task 1.0 in stage 0.0 (TID
> 1) on executor 192.168.0.193:
> org.apache.spark.api.python.PythonException (Traceback (most recent
> call last):   File
> "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py",
> line 98, in main
>     command = pickleSer._read_with_length(infile)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 164, in _read_with_length
>     return self.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 421, in loads
>     return pickle.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 629, in subimport
>     __import__(name) ImportError: ('No module named pandas', <function subimport at 0x7fef3731cd70>, ('pandas',)) ) [duplicate 1] 15/07/03
> 16:36:20 INFO TaskSetManager: Starting task 1.1 in stage 0.0 (TID 2,
> 192.168.0.193, PROCESS_LOCAL, 1491 bytes) 15/07/03 16:36:20 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 3, 192.168.0.193,
> PROCESS_LOCAL, 1412 bytes) 15/07/03 16:36:20 INFO TaskSetManager: Lost
> task 0.1 in stage 0.0 (TID 3) on executor 192.168.0.193:
> org.apache.spark.api.python.PythonException (Traceback (most recent
> call last):   File
> "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py",
> line 98, in main
>     command = pickleSer._read_with_length(infile)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 164, in _read_with_length
>     return self.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 421, in loads
>     return pickle.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 629, in subimport
>     __import__(name) ImportError: ('No module named pandas', <function subimport at 0x7fef3731cd70>, ('pandas',)) ) [duplicate 2] 15/07/03
> 16:36:20 INFO TaskSetManager: Starting task 0.2 in stage 0.0 (TID 4,
> 192.168.0.193, PROCESS_LOCAL, 1412 bytes) 15/07/03 16:36:21 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on
> 192.168.0.193:40099 (size: 13.7 KB, free: 265.4 MB) 15/07/03 16:36:23 WARN TaskSetManager: Lost task 1.1 in stage 0.0 (TID 2,
> 192.168.0.193): org.apache.spark.api.python.PythonException: Traceback (most recent call last):   File
> "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py",
> line 98, in main
>     command = pickleSer._read_with_length(infile)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 164, in _read_with_length
>     return self.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 421, in loads
>     return pickle.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 629, in subimport
>     __import__(name) ImportError: ('No module named pandas', <function subimport at 0x7fb5c3d5cd70>, ('pandas',))
> 
>   at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
>   at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
>   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)     at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)    at
> org.apache.spark.scheduler.Task.run(Task.scala:70)    at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 
> 15/07/03 16:36:23 INFO TaskSetManager: Starting task 1.2 in stage 0.0
> (TID 5, 192.168.0.193, PROCESS_LOCAL, 1491 bytes) 15/07/03 16:36:23
> INFO TaskSetManager: Lost task 0.2 in stage 0.0 (TID 4) on executor
> 192.168.0.193: org.apache.spark.api.python.PythonException (Traceback (most recent call last):   File
> "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py",
> line 98, in main
>     command = pickleSer._read_with_length(infile)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 164, in _read_with_length
>     return self.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 421, in loads
>     return pickle.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 629, in subimport
>     __import__(name) ImportError: ('No module named pandas', <function subimport at 0x7fb5c3d5cd70>, ('pandas',)) ) [duplicate 1] 15/07/03
> 16:36:23 INFO TaskSetManager: Starting task 0.3 in stage 0.0 (TID 6,
> 192.168.0.193, PROCESS_LOCAL, 1412 bytes) 15/07/03 16:36:23 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 6) on executor
> 192.168.0.193: org.apache.spark.api.python.PythonException (Traceback (most recent call last):   File
> "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py",
> line 98, in main
>     command = pickleSer._read_with_length(infile)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 164, in _read_with_length
>     return self.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 421, in loads
>     return pickle.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 629, in subimport
>     __import__(name) ImportError: ('No module named pandas', <function subimport at 0x7fef3731cd70>, ('pandas',)) ) [duplicate 3] 15/07/03
> 16:36:23 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times;
> aborting job 15/07/03 16:36:23 INFO TaskSchedulerImpl: Cancelling
> stage 0 15/07/03 16:36:23 INFO TaskSchedulerImpl: Stage 0 was
> cancelled 15/07/03 16:36:23 INFO DAGScheduler: ResultStage 0 (collect
> at /home/roman/dev/python/AWO-72/tdms_reader.py:461) failed in 16,581
> s 15/07/03 16:36:23 INFO DAGScheduler: Job 0 failed: collect at
> /home/roman/dev/python/AWO-72/tdms_reader.py:461, took 17,456362 s
> Traceback (most recent call last):   File
> "/home/roman/dev/python/AWO-72/tdms_reader.py", line 461, in <module>
>     rdd.map(lambda f: read_file(f)).collect()   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py",
> line 745, in collect   File
> "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__   File
> "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error
> occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe. :
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3
> in stage 0.0 (TID 6, 192.168.0.193):
> org.apache.spark.api.python.PythonException: Traceback (most recent
> call last):   File
> "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py",
> line 98, in main
>     command = pickleSer._read_with_length(infile)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 164, in _read_with_length
>     return self.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py",
> line 421, in loads
>     return pickle.loads(obj)   File "/home/roman/dev/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py",
> line 629, in subimport
>     __import__(name) ImportError: ('No module named pandas', <function subimport at 0x7fef3731cd70>, ('pandas',))
> 
>   at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
>   at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
>   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)     at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)    at
> org.apache.spark.scheduler.Task.run(Task.scala:70)    at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 
> Driver stacktrace:    at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>   at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>   at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>   at scala.Option.foreach(Option.scala:236)   at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>   at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
>   at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

你知道问题出在哪里吗?

最佳答案

正如@Holden 提到的,我建议查看

  1. 你的工作节点中的Python有pandas吗?为该 python 安装 pandas。
  2. 如果安装了多个 python 版本,请确保您使用的是正确的版本或带有 pandas 的版本。您可以通过添加以下内容来指定要在 ./conf/spark-eng.sh.template 中使用哪个 Python:

    导出 PYSPARK_PYTHON=/Users/schang/anaconda/bin/python 导出 PYSPARK_DRIVER_PYTHON=/Users/schang/anaconda/bin/ipython 或您想要使用的 Python 版本。

关于python - 集群上的 Pandas 和 Spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31207295/

相关文章:

python - 在Python中,应该如何测试环境变量中指定的路径中是否存在文件?

python - 如何在 Python 3.8 中安装 pymssql 模块?

python - 想改进简单的python代码

python - 使用 sklearn 和 Spark 时的轮廓分数不同

hadoop - yarn模式提交spark应用时出现异常

python - 在标准 CNN 上应用批量归一化的位置

python - 为 Dataframe 的特定列添加前缀

python - 为什么在一种情况下 pandas 数据框的一列中的值变化快而在另一种情况下变化慢?

python - Pandas :如何按列和索引对数据框进行排序

python - PySpark SQL 中的 LEFT 和 RIGHT 函数