python - 将机器学习模型保存/覆盖为 spark 集群中的单个文件

标签 python apache-spark machine-learning pyspark

我有使用线性回归的机器学习模型。我有 5 个虚拟机的 Spark 集群。训练模型后,我想保存模型,以便之后只需加载到内存中即可使用。

我试过用

model.save("/tmp/model.pkl").

像这样保存时,它会在集群的所有节点中创建名为 model.pkl 的目录,其中包含文件 data/、metadata/、_SUCCESS、._SUCCESS.crc、_temporary、.. 等等

有没有办法将模型保存为单个文件,如 model.pkl

此外,当我使用新的可用数据重新训练模型时,我正在使用 model.write().overwrite().save("/tmp/model.pkl") 覆盖现有模型,所以新的更新模型被保存在文件系统中。

但随后我得到异常 FileAlreadyExistsException

An error occurred while calling o94.save.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/tmp/cat_model.pkl/metadata already exists
    at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1119)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1096)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1070)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:961)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:960)
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1489)
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1468)
    at org.apache.spark.ml.util.DefaultParamsWriter$.saveMetadata(ReadWrite.scala:278)
    at org.apache.spark.ml.regression.LinearRegressionModel$LinearRegressionModelWriter.saveImpl(LinearRegression.scala:540)
    at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:114)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

如何覆盖现有模型?

我对集群的所有节点上的目录 /tmp 都有写权限。

尝试使用 model.load('/tmp/model.pkl') 加载模型时, 我收到错误

An error occurred while calling o94.load.
: java.lang.UnsupportedOperationException: empty collection

看来,save(path) 没有正确保存模型。 如何正确加载保存的模型。 在 Spark 中保存和再次加载学习模型的正确方法是什么?

最佳答案

TL;DR 在集群中使用分布式文件系统。

Is there a way to save the model as single file like model.pkl?

事实并非如此。输出中的不同文件与模型的不同组件相关。

Also when I retrain the model using newly available data, I am using model.write().overwrite().save("/tmp/model.pkl") to overwrite the existing models, so new updated model be persisted in filesystem (...) then I get exception as FileAlreadyExistsException

一般来说,您不应该使用本地文件系统来写入集群。虽然写入可能会部分成功(请注意,_temporary 目录没有像分布式文件系统那样被正确删除),但在这种情况下无法加载回数据,因为执行者会看到不一致的状态文件系统。

关于python - 将机器学习模型保存/覆盖为 spark 集群中的单个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46553933/

相关文章:

python - 如何在 clf.predict_proba() 中找到对应的类

python - Pandas - 编写 Parquet 并将列保留为十进制

python - 来自 Django 的 JsonResponse - 显示的值不正确

hadoop - Bluemix Spark 和 Hadoop 服务配置

scala - Spark : How do I pass a PartialFunction to a DStream?

machine-learning - Tensorflow 中 RNN 的 LSTMStateTuple 与 cell.zero_state()

python:从beautifulsoup读取数据并排列在pandas dataframe中

python - 如何创建多维列表

Python scikit-学习 : Why is my LinearRegression classifier's score so low?