python - 如何使用 worker 初始化时使用的变量启动 celery worker

标签 python celery

是否可以选择在启动时将变量传递给 celery worker,并在执行时在 worker 内部使用它?

我正在编写负责机器学习训练和评估的服务器。我想动态启动 worker 的新实例并将变量传递给它将用于在内部加载特定模型。

我从答案 here 中找到了如何启动 worker使用 worker_main 方法。

我在考虑两种解决方案:

  1. 设置为环境变量。此解决方案的问题在于,当同时创建两个 worker 实例时,它可能会被破坏。

  2. 将其作为 argv 传递,但我不知道如何读取 worker 中的变量。


编辑

我找到了 this线程,但它只讨论在任务中访问自定义参数。我的问题是关于在工作人员初始化时访问它。

灵感来自 this线程我会尝试使用 celery 信号。 http://docs.celeryproject.org/en/latest/userguide/signals.html#worker-init

最佳答案

也许我的问题不够准确,但我自己通过文档和 stackoverflow 线程找到了答案。

我想为 Keras 模型运行单独的 worker。在 worker 初始化中,我需要将模型加载到内存中,在任务中,模型用于预测。

我的解决方案:

  1. model_id 命名 worker(因为 id 是唯一的,每个模型只需要一个 worker)
  2. celeryd_after_setup 信号函数中,我解析了名称并在 worker 中设置了全局变量
  3. worker_process_init 信号函数上,我加载了模型,在我的例子中,它是 Grasper 类中的静态字段
  4. 在任务中我使用了 Grasper 类的静态字段

下面是一些准确描述解决方案的代码。

from celery.signals import worker_process_init, celeryd_after_setup
from celery.concurrency import asynpool

# my custom class containing static fields for model and tokenizer
# it also can be global variable as model_id
from myapp.ml import Grasper

# set to have some time for model loading otherwise worker_process_init can terminate
asynpool.PROC_ALIVE_TIMEOUT = 100.0
model_id = None

@celeryd_after_setup.connect()
def set_model_id(sender, instance, **kwargs):
    global model_id
    model_id = instance.hostname.split('@')[1]

@worker_process_init.connect()
def configure_worker(signal=None, sender=None, **kwargs):
    Grasper.load_model(model_id)

然后在 celery 任务中,您可以使用带有加载模型的 Grasper 类。 该解决方案有效,但我知道还有改进的地方,所以如果您有任何想法,请发表评论。

关于python - 如何使用 worker 初始化时使用的变量启动 celery worker ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53889613/

相关文章:

python - 加入/合并具有相同列名的数据框的微妙问题

php - 如何在 python 和 php 中使用 ZMQ 高效地发送单个消息

python - 'module' 对象没有属性 'SortedDict' Django Tastypie 错误

Django-通过 celery 从模型执行任务

python - Django Celery 任务没有完成并且一直处于待处理状态

python - flask 错误 : Unable to load celery application

python - 在Mac上使用Python监听键盘并遇到AttributeError : CFMachPortCreateRunLoopSource

python - 在没有安装权限的服务器上使用 Twisted?

python - celery :陷入无限重复超时(超时等待 UP 消息)

python - Django:celery 任务不使用 .delay() 执行