我正在尝试使用 spark-sklearn
库在 Spark 集群上执行网格搜索。出于这个原因,我在我的 bash
shell 中运行 nohup ./spark_python_shell.sh > output.log &
来启动 Spark 集群,我也让我的 python 脚本运行(见下文 spark-submit\--master yarn 'rforest_grid_search.py'
):
SPARK_HOME=/u/users/******/spark-2.3.0 \
Q_CORE_LOC=/u/users/******/****** \
ENV=local \
HIVE_HOME=/usr/hdp/current/hive-client \
SPARK2_HOME=/u/users/******/spark-2.3.0 \
HADOOP_CONF_DIR=/etc/hadoop/conf \
HIVE_CONF_DIR=/etc/hive/conf \
HDFS_PREFIX=hdfs:// \
PYTHONPATH=/u/users/******/******/python-lib:/u/users/******/******/python-lib:/u/users/******/pyenv/prod_python_libs/lib/python2.7/site-packages/:$PYTHON_PATH \
YARN_HOME=/usr/hdp/current/hadoop-yarn-client \
SPARK_DIST_CLASSPATH=$(hadoop classpath):$(yarn classpath):/etc/hive/conf/hive-site.xml \
PYSPARK_PYTHON=/usr/bin/python2.7 \
QQQ_LOC=/u/users/******/three-queues \
spark-submit \
--master yarn 'rforest_grid_search.py' \
--executor-memory 10g \
--num-executors 8 \
--executor-cores 10 \
--conf spark.port.maxRetries=80 \
--conf spark.dynamicAllocation.enabled=False \
--conf spark.default.parallelism=6000 \
--conf spark.sql.shuffle.partitions=6000 \
--principal ************************ \
--queue default \
--name lets_get_starting \
--keytab /u/users/******/.******.keytab \
--driver-memory 10g
在这个 rforest_grid_search.py
python 脚本中,有以下源代码尝试将 Grid Search 与 Spark 集群连接起来:
# Spark configuration
from pyspark import SparkContext, SparkConf
conf = SparkConf()
sc = SparkContext(conf=conf)
print('Spark Context:', sc)
# Hyperparameters' grid
parameters = {'n_estimators': list(range(150, 200, 25)), 'criterion': ['gini', 'entropy'], 'max_depth': list(range(2, 11, 2)), 'max_features': [i/10. for i in range(10, 16)], 'class_weight': [{0: 1, 1: i/10.} for i in range(10, 17)], 'min_samples_split': list(range(2, 7))}
# Execute grid search - using spark_sklearn library
from spark_sklearn import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
classifiers_grid = GridSearchCV(sc, estimator=RandomForestClassifier(), param_grid=parameters, scoring='precision', cv=5, n_jobs=-1)
classifiers_grid.fit(X, y)
当我运行 python 脚本时,我在 classifiers_grid.fit(X, y)
行收到错误,如下所示:
ImportError: No module named model_selection._validation
或更详细(但由于太长而没有包括所有内容)如下:
...
('Spark Context:', <SparkContext master=yarn appName=rforest_grid_search.py>)
...
18/10/24 12:43:50 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, oser404637.*****.com, executor 2, partition 2, PROCESS_LOCAL, 42500 bytes)
18/10/24 12:43:50 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, oser404637.*****.com, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/worker.py", line 216, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/worker.py", line 58, in read_command
command = serializer._read_with_length(file)
File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
return self.loads(obj)
File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/serializers.py", line 562, in loads
return pickle.loads(obj)
ImportError: No module named model_selection._validation
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
...
当我运行相同的 python 脚本但稍作修改(在交叉验证方面)时,我收到以下错误:
Traceback (most recent call last):
File "/data/users/******/rforest_grid_search.py", line 126, in <module>
classifiers_grid.fit(X, y)
File "/usr/lib/python2.7/site-packages/spark_sklearn/grid_search.py", line 274, in fit
return self._fit(X, y, groups, ParameterGrid(self.param_grid))
File "/usr/lib/python2.7/site-packages/spark_sklearn/grid_search.py", line 321, in _fit
indexed_out0 = dict(par_param_grid.map(fun).collect())
File "/u/users/******/spark-2.3.0/python/lib/pyspark.zip/pyspark/rdd.py", line 824, in collect
File "/u/users/******/spark-2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/u/users/******/spark-2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, oser402389.wal-mart.com, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/worker.py", line 216, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/worker.py", line 58, in read_command
command = serializer._read_with_length(file)
File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
return self.loads(obj)
File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/serializers.py", line 562, in loads
return pickle.loads(obj)
ImportError: No module named sklearn.base
如何解决此问题并在 Spark 集群上执行 GridSearchCV?
此错误是否仅仅意味着 scikit-learn
和/或 spark-sklearn
未安装在 Spark 工作节点上(即使显然它们安装在我用来连接到 Spark 集群的 Spark 边缘/驱动程序节点上)?
最佳答案
Does this error simply mean that scikit-learn and/or spark-sklearn is not installed on the Spark worker nodes
是的,它准确地表明,或者更准确地说,这些模块不存在于您的 Spark worker 使用的 Python 解释器的路径上。
一般情况下,工作端代码使用的所有模块都必须在每个节点上都可以访问。根据依赖的复杂程度,有不同的选择
- 在每个容器或容器(如果使用)中安装所有依赖项。通常更喜欢,因为没有运行时开销,并且在适用的情况下使用优化的 native 库(对于高性能机器学习至关重要)。
- 使用
pyfiles
选项在任务中分发包(通常是eggs
)。适用于不需要编译且没有 native 依赖项的简单、纯 Python 依赖项。 - 分发具有本地依赖项的完整虚拟环境(如 conda)。可以在简单的情况下工作,但开销很高(每个任务都分发大量存档),不能在混合架构集群上工作,并且使用未优化的 native 依赖项。
- 从任务中安装 Python 依赖项(如果存在 native 依赖项)- Numpy and static linking
关于python - Spark 集群上的 GridSearchCV - ImportError : No module named,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52985968/