当我构建并调用 Spark 模型时,预测需要数十毫秒才能返回。 然而,当我保存同一个模型,然后加载它时,预测需要更长的时间。我应该使用某种缓存吗?
model.cache()
加载后不起作用,因为模型不是 RDD。
效果很好:
from pyspark.mllib.recommendation import ALS
from pyspark import SparkContext
import time
sc = SparkContext()
# Some example data
r = [(1, 1, 1.0),
(1, 2, 2.0),
(2, 1, 2.0)]
ratings = sc.parallelize(r)
model = ALS.trainImplicit(ratings, 1, seed=10)
# Call model and time it
now = time.time()
for t in range(10):
model.predict(2, 2)
elapsed = (time.time() - now)*1000/(t+1)
print "Average time for model call: {:.2f}ms".format(elapsed)
model.save(sc, 'my_spark_model')
输出:模型调用的平均时间:71.18ms
如果我运行以下命令,预测将花费更多时间:
from pyspark.mllib.recommendation import MatrixFactorizationModel
from pyspark import SparkContext
import time
sc = SparkContext()
model_path = "my_spark_model"
model = MatrixFactorizationModel.load(sc, model_path)
# Call model and time it
now = time.time()
for t in range(10):
model.predict(2, 2)
elapsed = (time.time() - now)*1000/(t+1)
print "Average time for loaded model call: {:.2f}ms".format(elapsed)
输出:加载模型调用的平均时间:180.34ms
对于 BIG 模型,加载保存的模型后,我发现单次调用的预测时间超过 10 秒。
最佳答案
简而言之:不,它似乎不是可以缓存整个模型的东西,因为它不是 RDD。
你可以尝试使用cache() , 但你不能缓存模型本身,因为它不是 RDD,所以试试这个:
model.productFeatures().cache()
model.userFeatures().cache()
推荐到unpersist()在您不需要它们之后,尤其是当您处理非常大的数据时,因为您将希望保护您的工作免受内存不足错误的影响。
当然,您可以使用persist()
代替cache()
;您可能想阅读:What is the difference between cache and persist?
请记住,Spark 会转换延迟,因此当您加载模型时什么都不会发生。它需要一个 action 来触发实际工作(即,当您真正使用 model
时,Spark 将尝试加载它,导致您遇到一些延迟,而不是将其放入内存)。
另请注意:cache()
是惰性的,因此您可以显式地使用 RDD.count()
加载到内存中。
实验的输出:
Average time for model call: 1518.83ms
Average time for loaded model call: 2352.70ms
Average time for loaded model call with my suggestions: 8886.61ms
顺便说一句,加载模型后,您应该会收到这种警告:
16/08/24 00:14:05 WARN MatrixFactorizationModel: User factor does not have a partitioner. Prediction on individual records could be slow.
16/08/24 00:14:05 WARN MatrixFactorizationModel: User factor is not cached. Prediction could be slow.
但是如果我使用计数技巧呢?我根本不会得到任何收获,事实上我会变慢:
...
model.productFeatures().cache()
model.productFeatures().count()
model.userFeatures().cache()
model.userFeatures().count()
...
输出:
Average time for loaded model call: 13571.14ms
没有 cache()
,保留 count()
,我得到:
Average time for loaded model call: 9312.01ms
重要说明:在真实世界的集群中执行计时,节点被分配给重要的工作,所以我的玩具示例可能在实验期间被抢占了。此外,通信成本可能占主导地位。
所以,如果我是你,我也会自己进行实验。
总而言之,除此之外,Spark 似乎没有任何其他机制可用于缓存您的模型。
关于python - 如何加载 Spark 模型以进行高效预测,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39111476/