python - 在工作人员上加载本地(不可序列化)对象

标签 python google-cloud-dataflow apache-beam

我正在尝试将 Dataflow 与 Tensorflow 结合使用进行预测。这些预测正在工作人员身上发生,我目前正在通过 startup_bundle() 加载模型。就像这里:

class PredictDoFn(beam.DoFn): 
    def start_bundle(self):
        self.model = load_model_from_file()
    def process(self, element):
        ...

我当前的问题是,即使我处理 1000 个元素,startup_bundle() 函数也会被调用多次(至少 10 次),而不是像我希望的那样每次工作一次。这会显着减慢管道速度,因为模型需要加载多次,并且每次需要 30 秒。

是否有任何方法可以在初始化时在工作线程上加载模型,而不是每次都在 start_bundle() 中加载模型?

提前致谢! 迪米特里

最佳答案

最简单的方法是添加一个如果 self.model 为 None:self.model = load_model_from_file(),这可能不会减少模型重新加载的次数。

这是因为 DoFn 实例当前并未跨 bundle 重用。这意味着执行每个工作项后您的模型将被遗忘

您还可以创建一个用于保存模型的全局变量。这会减少重新加载的数量,但这确实是非正统的(尽管它可能会解决您的用例)。


全局变量方法应该像这样工作:

class MyModelDoFn(object):
  def process(self, elem):
    global my_model
    if my_model is None:
      my_model = load_model_from_file()
    yield my_model.apply_to(elem)

依赖于线程局部变量的方法看起来像这样。考虑到这将在每个线程加载模型一次,因此加载模型的次数取决于运行器实现(它将在 Dataflow 中工作):

class MyModelDoFn(object):
  _thread_local = threading.local()
  @property
  def model(self):
    model = getattr(MyModelDoFn._thread_local, 'model', None)
    if not model:
      MyModelDoFn._thread_local.model = load_model_from_file()

    return MyModelDoFn._thread_local.model

  def process(self, elem):
    yield self.model.apply_to(elem)

我想您也可以从 start_bundle 调用加载模型。

注意:这种方法非常非正统,并且不能保证在新版本或所有运行器中都有效。

关于python - 在工作人员上加载本地(不可序列化)对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45744173/

相关文章:

python - SQLAlchemy 基于字典对约束进行建模

python - 尝试在 python 中将 Plotly 图保存到图像文件时出现奇怪的错误?

java - 是否可以根据窗口元素的时间戳动态生成 BigQuery 表名?

google-cloud-dataflow - 数据流是否支持自定义触发器或更新触发器延迟?

java - Apache Beam 通配符递归搜索文件

python - 在 Python 的同一个函数中使用 return 和 print

python - 在Python中解析plist文件

python - 为什么使用 "--requirements_file"将依赖项上传到 GCS?

google-cloud-dataflow - 如何在具有小捆绑的流式管道中按 N 个元素进行批处理?

python - 如何在 python 中从数据流/光束查询数据存储