我正在尝试将 Dataflow 与 Tensorflow 结合使用进行预测。这些预测正在工作人员身上发生,我目前正在通过 startup_bundle()
加载模型。就像这里:
class PredictDoFn(beam.DoFn):
def start_bundle(self):
self.model = load_model_from_file()
def process(self, element):
...
我当前的问题是,即使我处理 1000 个元素,startup_bundle()
函数也会被调用多次(至少 10 次),而不是像我希望的那样每次工作一次。这会显着减慢管道速度,因为模型需要加载多次,并且每次需要 30 秒。
是否有任何方法可以在初始化时在工作线程上加载模型,而不是每次都在 start_bundle()
中加载模型?
提前致谢! 迪米特里
最佳答案
最简单的方法是添加一个如果 self.model 为 None:self.model = load_model_from_file()
,这可能不会减少模型重新加载的次数。
这是因为 DoFn 实例当前并未跨 bundle 重用。这意味着执行每个工作项后您的模型将被遗忘。
您还可以创建一个用于保存模型的全局
变量。这会减少重新加载的数量,但这确实是非正统的(尽管它可能会解决您的用例)。
全局变量方法应该像这样工作:
class MyModelDoFn(object):
def process(self, elem):
global my_model
if my_model is None:
my_model = load_model_from_file()
yield my_model.apply_to(elem)
依赖于线程局部变量的方法看起来像这样。考虑到这将在每个线程加载模型一次,因此加载模型的次数取决于运行器实现(它将在 Dataflow 中工作):
class MyModelDoFn(object):
_thread_local = threading.local()
@property
def model(self):
model = getattr(MyModelDoFn._thread_local, 'model', None)
if not model:
MyModelDoFn._thread_local.model = load_model_from_file()
return MyModelDoFn._thread_local.model
def process(self, elem):
yield self.model.apply_to(elem)
我想您也可以从 start_bundle
调用加载模型。
注意:这种方法非常非正统,并且不能保证在新版本或所有运行器中都有效。
关于python - 在工作人员上加载本地(不可序列化)对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45744173/