我有一个存储在 S3 中的 weka 模型,其大小约为 400MB。
现在,我有一组记录,我想在这些记录上运行模型并执行预测。
为了执行预测,我尝试过的是,
----> 不工作,如在 Weka 中执行预测,模型对象需要修改并且广播需要只读副本。
-----> 工作(效率不高,因为在每个 map 操作中,我传递了 400MB 对象)
有人知道如何在每个执行器上加载模型一次并缓存它,以便我不会再次加载其他记录吗?
最佳答案
您有两个选择:
1. 创建一个用惰性 val 表示数据的单例对象:
object WekaModel {
lazy val data = {
// initialize data here. This will only happen once per JVM process
}
}
然后,您可以在
map
中使用惰性 val。功能。 lazy val
确保每个工作 JVM 初始化他们自己的数据实例。不会为 data
执行序列化或广播. elementsRDD.map { element =>
// use WekaModel.data here
}
优势
缺点
2. 使用
mapPartition
(或 foreachPartition
)方法在 RDD 上,而不仅仅是 map
.这允许您初始化整个分区所需的任何内容。
elementsRDD.mapPartition { elements =>
val model = new WekaModel()
elements.map { element =>
// use model and element. there is a single instance of model per partition.
}
}
优势 :
缺点
关于scala - 如何在 Spark 中对每个执行器执行一次操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40015777/