python - 如何覆盖 celery 任务的后端

标签 python celery celery-task

我们使用 Redis 作为结果后端。然而,对于一项任务,我们想覆盖它以改用 RabbitMQ。

Task.backend 的文档说:

The result store backend to use for this task. Defaults to the CELERY_RESULT_BACKEND setting

所以我假设我们可以将 Task.backend 设置为与 CELERY_RESULT_BACKEND 接受的格式相同的字符串。

所以我试试这个:

celeryconfig.py

CELERY_RESULT_BACKEND = "redis://redis-host:7777"

tasks.py

@app.task(backend='amqp://guest@localhost/tasks-stg')
def my_task(params):
    ...

但是 worker 失败了:

[2015-05-07 13:33:49,264: ERROR/Worker-1] Process Worker-1
Traceback (most recent call last):
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
    self.run()
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 286, in run
    self.after_fork()
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 389, in after_fork
    self.initializer(*self.initargs)
  File "/project/python2.7_x64/lib/python2.7/site-packages/celery/concurrency/prefork.py", line 81, in process_initializer
    app=app)
  File "/project/python2.7_x64/lib/python2.7/site-packages/celery/app/trace.py", line 178, in build_tracer
    store_result = backend.store_result
AttributeError: 'str' object has no attribute 'store_result'

最佳答案

文档不正确。 Task.backend 实际上是 celery.backends 后端类的一个实例。在这种情况下,要覆盖任务类,我必须这样做:

from celery.backends.amqp import AMQPBackend

@app.task(backend=AMQPBackend(app, url='amqp://guest@localhost/tasks-stg'))
def my_task(params):
    ...

但是工作人员继续使用默认类并且似乎没有提供覆盖它的方法。

关于python - 如何覆盖 celery 任务的后端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30102207/

相关文章:

python - celery : @shared_task and non-standard BROKER_URL

python - 查询期间失去与 MySQL 服务器的连接

python - 将列表的副本传递给 python 中的函数

python - python中的最小子数组差异

python - django/celery - celery 状态 : Error: No nodes replied within time constraint

python - 当我将 Django Celery apply_async 与 eta 一起使用时,它会立即完成工作

Python string.split for循环中的多个值

python - 如何让 Celery 返回一个 json 对象而不是 bytea?

django - celery 任务 .get() 不工作

django - 在 django 中使用 celery 和 ffmpeg 转码视频