python - Spark 集群上的 GridSearchCV - ImportError : No module named

标签 python apache-spark machine-learning

我正在尝试使用 spark-sklearn 库在 Spark 集群上执行网格搜索。出于这个原因,我在我的 bash shell 中运行 nohup ./spark_python_shell.sh > output.log & 来启动 Spark 集群,我也让我的 python 脚本运行(见下文 spark-submit\--master yarn 'rforest_grid_search.py​​'):

    SPARK_HOME=/u/users/******/spark-2.3.0 \
    Q_CORE_LOC=/u/users/******/****** \
    ENV=local \
    HIVE_HOME=/usr/hdp/current/hive-client \
    SPARK2_HOME=/u/users/******/spark-2.3.0 \
    HADOOP_CONF_DIR=/etc/hadoop/conf \
    HIVE_CONF_DIR=/etc/hive/conf \
    HDFS_PREFIX=hdfs:// \
    PYTHONPATH=/u/users/******/******/python-lib:/u/users/******/******/python-lib:/u/users/******/pyenv/prod_python_libs/lib/python2.7/site-packages/:$PYTHON_PATH \
    YARN_HOME=/usr/hdp/current/hadoop-yarn-client \
    SPARK_DIST_CLASSPATH=$(hadoop classpath):$(yarn classpath):/etc/hive/conf/hive-site.xml \
    PYSPARK_PYTHON=/usr/bin/python2.7 \
    QQQ_LOC=/u/users/******/three-queues \
    spark-submit \
    --master yarn 'rforest_grid_search.py' \
    --executor-memory 10g \
    --num-executors 8 \
    --executor-cores 10 \
    --conf spark.port.maxRetries=80 \
    --conf spark.dynamicAllocation.enabled=False \
    --conf spark.default.parallelism=6000 \
    --conf spark.sql.shuffle.partitions=6000 \
    --principal ************************ \
    --queue default \
    --name lets_get_starting \
    --keytab /u/users/******/.******.keytab \
    --driver-memory 10g

在这个 rforest_grid_search.py​​ python 脚本中,有以下源代码尝试将 Grid Search 与 Spark 集群连接起来:

# Spark configuration
from pyspark import SparkContext, SparkConf
conf = SparkConf()
sc = SparkContext(conf=conf)
print('Spark Context:', sc)

# Hyperparameters' grid
parameters = {'n_estimators': list(range(150, 200, 25)), 'criterion': ['gini', 'entropy'], 'max_depth': list(range(2, 11, 2)), 'max_features': [i/10. for i in range(10, 16)], 'class_weight': [{0: 1, 1: i/10.} for i in range(10, 17)], 'min_samples_split': list(range(2, 7))}

# Execute grid search - using spark_sklearn library
from spark_sklearn import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
classifiers_grid = GridSearchCV(sc, estimator=RandomForestClassifier(), param_grid=parameters, scoring='precision', cv=5, n_jobs=-1)
classifiers_grid.fit(X, y)

当我运行 python 脚本时,我在 classifiers_grid.fit(X, y) 行收到错误,如下所示:

ImportError: No module named model_selection._validation

或更详细(但由于太长而没有包括所有内容)如下:

...
    ('Spark Context:', <SparkContext master=yarn appName=rforest_grid_search.py>)
...
    18/10/24 12:43:50 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, oser404637.*****.com, executor 2, partition 2, PROCESS_LOCAL, 42500 bytes)
    18/10/24 12:43:50 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, oser404637.*****.com, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/worker.py", line 216, in main
        func, profiler, deserializer, serializer = read_command(pickleSer, infile)
      File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/worker.py", line 58, in read_command
        command = serializer._read_with_length(file)
      File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
        return self.loads(obj)
      File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/serializers.py", line 562, in loads
        return pickle.loads(obj)
    ImportError: No module named model_selection._validation

            at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
...

当我运行相同的 python 脚本但稍作修改(在交叉验证方面)时,我收到以下错误:

Traceback (most recent call last):
  File "/data/users/******/rforest_grid_search.py", line 126, in <module>
    classifiers_grid.fit(X, y)
  File "/usr/lib/python2.7/site-packages/spark_sklearn/grid_search.py", line 274, in fit
    return self._fit(X, y, groups, ParameterGrid(self.param_grid))
  File "/usr/lib/python2.7/site-packages/spark_sklearn/grid_search.py", line 321, in _fit
    indexed_out0 = dict(par_param_grid.map(fun).collect())
  File "/u/users/******/spark-2.3.0/python/lib/pyspark.zip/pyspark/rdd.py", line 824, in collect
  File "/u/users/******/spark-2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/u/users/******/spark-2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, oser402389.wal-mart.com, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/worker.py", line 216, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/worker.py", line 58, in read_command
    command = serializer._read_with_length(file)
  File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/serializers.py", line 562, in loads
    return pickle.loads(obj)
ImportError: No module named sklearn.base

如何解决此问题并在 Spark 集群上执行 GridSearchCV?

此错误是否仅仅意味着 scikit-learn 和/或 spark-sklearn 未安装在 Spark 工作节点上(即使显然它们安装在我用来连接到 Spark 集群的 Spark 边缘/驱动程序节点上)?

最佳答案

Does this error simply mean that scikit-learn and/or spark-sklearn is not installed on the Spark worker nodes

是的,它准确地表明,或者更准确地说,这些模块不存在于您的 Spark worker 使用的 Python 解释器的路径上。

一般情况下,工作端代码使用的所有模块都必须在每个节点上都可以访问。根据依赖的复杂程度,有不同的选择

  • 在每个容器或容器(如果使用)中安装所有依赖项。通常更喜欢,因为没有运行时开销,并且在适用的情况下使用优化的 native 库(对于高性能机器学习至关重要)。
  • 使用pyfiles 选项在任务中分发包(通常是eggs)。适用于不需要编译且没有 native 依赖项的简单、纯 Python 依赖项。
  • 分发具有本地依赖项的完整虚拟环境(如 conda)。可以在简单的情况下工作,但开销很高(每个任务都分发大量存档),不能在混合架构集群上工作,并且使用未优化的 native 依赖项。
  • 从任务中安装 Python 依赖项(如果存在 native 依赖项)- Numpy and static linking

关于python - Spark 集群上的 GridSearchCV - ImportError : No module named,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52985968/

相关文章:

apache-spark - 使用 Apache-Spark,根据条件减少或折叠 RDD

machine-learning - 为什么梯度下降会更新 0 值权重?

javascript - 在客户端 javascript 中使用/生产经过训练的机器学习模型?

python - 如何在 Jupyter Notebook 中导入 Pyperclip?

python - 从数据框的开头向 Pandas 数据框的结尾添加值

小数的Python算术

python - Django 教程 : What is get_queryset and why "model = poll" isn't needed?

apache-spark - 启动 Spark History Server 时如何指定 Spark 属性?

hadoop - 将BerkeleyDB与Hadoop和Spark结合使用

machine-learning - 机器学习 - 从文档中提取信息