python - Elephas 未加载到 PySpark : No module named elephas. spark_model

标签 python apache-spark pyspark keras distributed-computing

我正在尝试在集群上分发 Keras 训练并使用 Elephas 来实现。但是,当运行 Elephas 文档中的基本示例时 ( https://github.com/maxpumperla/elephas ):

from elephas.utils.rdd_utils import to_simple_rdd
rdd = to_simple_rdd(sc, x_train, y_train)
from elephas.spark_model import SparkModel
from elephas import optimizers as elephas_optimizers
sgd = elephas_optimizers.SGD()
spark_model = SparkModel(sc, model, optimizer=sgd, frequency='epoch', mode='asynchronous', num_workers=2)
spark_model.train(rdd, nb_epoch=epochs, batch_size=batch_size, verbose=1, validation_split=0.1)

我收到以下错误:

 ImportError: No module named elephas.spark_model



```Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5.0 (TID 58, xxxx, executor 8): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/xx/xx/hadoop/yarn/local/usercache/xx/appcache/application_151xxx857247_19188/container_1512xxx247_19188_01_000009/pyspark.zip/pyspark/worker.py", line 163, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/xx/xx/hadoop/yarn/local/usercache/xx/appcache/application_151xxx857247_19188/container_1512xxx247_19188_01_000009/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File /yarn/local/usercache/xx/appcache/application_151xxx857247_19188/container_1512xxx247_19188_01_000009/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/yarn//local/usercache/xx/appcache/application_151xxx857247_19188/container_1512xxx247_19188_01_000009/pyspark.zip/pyspark/serializers.py", line 454, in loads
    return pickle.loads(obj)
ImportError: No module named elephas.spark_model

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)```

另外,模型实际上已创建,我可以这样做print(spark_model)并会得到这个 <elephas.spark_model.SparkModel object at 0x7efce0abfcd0> 。错误发生在spark_model.train期间.

我已经使用 pip2 install git+https://github.com/maxpumperla/elephas 安装了 elephas ,也许这是相关的。

我使用 PySpark 2.1.1、Keras 2.1.4 和 Python 2.7。 我尝试使用spark-submit运行它:

PYSPARK_DRIVER_PYTHON=`which python` spark-submit --driver-memory 1G  filname.py

也可以直接在 Jupyter Notebook 中。两者都会导致同样的问题。

谁能给我指点一下吗?这是 elephas 相关的还是 PySpark 问题?

编辑:我还上传了虚拟环境的 zip 文件并在脚本中调用它:

virtualenv spark_venv --relocatable
cd spark_venv 
zip -qr ../spark_venv.zip *

PYSPARK_DRIVER_PYTHON=`which python` spark-submit --driver-memory 1G --py-files spark_venv.zip filename.py

然后在文件中我这样做:

sc.addPyFile("spark_venv.zip")

导入此keras后没有任何问题,但我仍然得到elephas上面的错误。

最佳答案

我找到了如何将虚拟环境正确加载到主服务器和所有从服务器的解决方案:

virtualenv venv --relocatable
cd venv 
zip -qr ../venv.zip *

PYSPARK_PYTHON=./SP/bin/python spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./SP/bin/python --driver-memory 4G --archives venv.zip#SP filename.py

GitHub 问题中的更多详细信息: https://github.com/maxpumperla/elephas/issues/80#issuecomment-371073492

关于python - Elephas 未加载到 PySpark : No module named elephas. spark_model,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49132151/

相关文章:

python - 为什么我在这个 python 代码中得到 "AttributeError: ' str' object has no attribute 'write' “

python - Pyspark - 多列聚合

python - 如何从 S3 读取 Parquet 数据以激发 Python 数据框?

java - 线程中出现异常 "streaming-job-executor-11"java.lang.ClassFormatError

scala - 线程 "main"java.lang.NoSuchFieldError : BROTLI 中出现异常

apache-spark - Spark矩阵乘法代码需要花费大量时间来执行

python - 如何从 Pandas 数据框中的列中删除字符串值

python - 值错误 : Dataset with data_id 554 not found

python - 有没有办法更快地运行 OpenCV 的 SIFT?

arrays - 在数组或Scala Spark中的其他任何集合中迭代RDD和存储的值