我有使用线性回归的机器学习模型。我有 5 个虚拟机的 Spark 集群。训练模型后,我想保存模型,以便之后只需加载到内存中即可使用。
我试过用
model.save("/tmp/model.pkl").
像这样保存时,它会在集群的所有节点中创建名为 model.pkl 的目录,其中包含文件 data/、metadata/、_SUCCESS、._SUCCESS.crc、_temporary、.. 等等
有没有办法将模型保存为单个文件,如 model.pkl
?
此外,当我使用新的可用数据重新训练模型时,我正在使用 model.write().overwrite().save("/tmp/model.pkl")
覆盖现有模型,所以新的更新模型被保存在文件系统中。
但随后我得到异常 FileAlreadyExistsException
An error occurred while calling o94.save.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/tmp/cat_model.pkl/metadata already exists
at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1119)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1070)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:961)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:960)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1489)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1468)
at org.apache.spark.ml.util.DefaultParamsWriter$.saveMetadata(ReadWrite.scala:278)
at org.apache.spark.ml.regression.LinearRegressionModel$LinearRegressionModelWriter.saveImpl(LinearRegression.scala:540)
at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:114)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
如何覆盖现有模型?
我对集群的所有节点上的目录 /tmp
都有写权限。
尝试使用 model.load('/tmp/model.pkl')
加载模型时,
我收到错误
An error occurred while calling o94.load.
: java.lang.UnsupportedOperationException: empty collection
看来,save(path)
没有正确保存模型。
如何正确加载保存的模型。
在 Spark 中保存和再次加载学习模型的正确方法是什么?
最佳答案
TL;DR 在集群中使用分布式文件系统。
Is there a way to save the model as single file like model.pkl?
事实并非如此。输出中的不同文件与模型的不同组件相关。
Also when I retrain the model using newly available data, I am using model.write().overwrite().save("/tmp/model.pkl") to overwrite the existing models, so new updated model be persisted in filesystem (...) then I get exception as FileAlreadyExistsException
一般来说,您不应该使用本地文件系统来写入集群。虽然写入可能会部分成功(请注意,_temporary
目录没有像分布式文件系统那样被正确删除),但在这种情况下无法加载回数据,因为执行者会看到不一致的状态文件系统。
关于python - 将机器学习模型保存/覆盖为 spark 集群中的单个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46553933/