python - 用于异步任务的 celery 任务注册表

标签 python asynchronous rabbitmq celery ipc

我不知道如何以模块化方式定义 celery 任务(即 不是 同一文件中的所有任务)并正确注册它们以供异步使用。我已经尝试了所有我能想到的选项:

  • 使用基于装饰器的任务
  • 使用基于类的任务
  • 在 celery 应用对象上使用 autodiscover_tasks
  • 手动注册任务,即 app.tasks.register(Task1()) 用于基于类的任务
  • 在与 celery_app.py 相同的目录中执行 celery worker
  • 在包含 celery_app.py 的目录上执行 celery worker
  • 使用 -A 选项为 celery worker 命令指定应用程序。
  • 为 celery worker 指定应用
  • 在基于类的任务中指定一个空的“名称”属性。
  • 指定包含正确模块的名称属性

无论我做什么,我总是以任务注册表抛出“KeyError”而告终,但在使用 apply_async 执行时。同步版本总是工作正常。

如果有人可以提示我应该如何解决此问题,请分享。

这是一个最小的例子:

  • 极简
    • 任务1
      • __init__.py
      • task.py
    • 任务2
      • __init__.py
      • 任务.py
    • __init__.py
    • celery_app.py
    • start.sh
    • test.py

minimal.task1.task

# -*- coding: utf-8 -*-
from celery import Task
from minimal2.celery_app import app
class Task1(Task):
   name = ""
   def run(self, number):
       return number / 2.0

app.tasks.register(Task1())

minimal.task2.task

# -*- coding: utf-8 -*-
from celery import Task
from minimal2.celery_app import app

class Task2(Task):
    name = "minimal2.task2.task.Task2"
    def run(self, number):
        return number * number

app.tasks.register(Task2())

minimal2.celery_app

# -*- coding: utf-8 -*-
from celery import Celery

app = Celery('minimal', backend='amqp', broker='amqp://')
app.autodiscover_tasks(['task1', 'task2'], 'task')

minimal2/start.sh

#!/bin/bash
set -e

start_celery_service() {
    name=$1
    pid_file_path="$(pwd)/${name}.pid"
    if [ -e "${pid_file_path}" ] ; then
        kill $(cat ${pid_file_path}) && :
        sleep 3.0
        rm -f "${pid_file_path}"  # just in case the file was stale
    fi
    celery -A minimal2.celery_app.app worker -l DEBUG --pidfile=${pid_file_path} --logfile="$(pwd)/${name}.log" &
    sleep 3.0
}

prev_dir=$(pwd)
cd "$(dirname "$0")"
cd ../
rabbitmq-server &
start_celery_service "worker1"
cd $prev_dir

测试

from minimal2.task1.task import Task1
print Task1().apply(args=[], kwargs={'number':2}).get()
> 1.0
print Task1().apply_async(args=[], kwargs={'number':2}).get() # (first time: never comes back -> hitting ctrl-c)
print Task1().apply_async(args=[], kwargs={'number':2}).get() # second time
 Traceback (most recent call last):
   File "<stdin>", line 1, in <module>
   File "/usr/local/lib/python2.7/site-packages/celery/result.py", line 194, in get
     on_message=on_message,
   File "/usr/local/lib/python2.7/site-packages/celery/backends/base.py", line 470, in wait_for_pending
     return result.maybe_throw(propagate=propagate, callback=callback)
   File "/usr/local/lib/python2.7/site-packages/celery/result.py", line 299, in maybe_throw
     self.throw(value, self._to_remote_traceback(tb))
   File "/usr/local/lib/python2.7/site-packages/celery/result.py", line 292, in throw
     self.on_ready.throw(*args, **kwargs)
   File "/usr/local/lib/python2.7/site-packages/vine/promises.py", line 217, in throw
     reraise(type(exc), exc, tb)
   File "<string>", line 1, in reraise
 celery.backends.base.NotRegistered: ''

#.. same spiel with Task2:
#..
> celery.backends.base.NotRegistered: 'minimal2.task2.task.Task2'

#.. same if I do name = __name__ in Task2:
#..
> celery.backends.base.NotRegistered: 'minimal2.task2.task'

# autodiscover had no effect

我在 Ubuntu 中的 docker 容器和 macOS 中有相同的行为,两者都在 Pypy 上的最新 celery 版本上:

celery report

software -> celery:4.1.0 (latentcall) kombu:4.1.0 py:2.7.13
            billiard:3.5.0.3 py-amqp:2.2.2
platform -> system:Darwin arch:64bit imp:CPython
loader   -> celery.loaders.default.Loader
settings -> transport:amqp results:disabled

最佳答案

如果我正确理解了这个问题,您可以在创建 celery 应用程序的地方使用 include 参数。它将注册在 include 参数中提到的模块中找到的所有任务。例如:

celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'],
                    CELERY_RESULT_BACKEND=app.config['CELERY_BROKER_URL'],
                    include=['minimal.task1', 'minimal.task2'])

由问题发布者编辑:另外,为了获得正确的导入命名,任务类的名称属性需要设置如下:

class Task1(Task):
   name = __name__

本质上,任务注册时name的值需要与客户端导入任务的名称完全匹配。

关于python - 用于异步任务的 celery 任务注册表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47050010/

相关文章:

python - 在Python中使用string.strip()提取特定列

c# - ExcelAsyncUtil.Observe - 在 Excel 中创建运行时钟

c# - RabbitMQ C# 驱动程序停止接收消息

ssl - Kubernetes 上的 RabbitMQ kubernetes.default.svc.cluster.local tls qlert

python - 命名建议 "subtract absolute"类型函数

python - 从 Twisted 中的 react 器中移除客户端

javascript - 如何知道 npm `unzip` 模块何时完成解压缩文件?

php - 通过rabbitmq(amqp协议(protocol))从云端向设备物联网中心发送消息时遇到问题

python mysql连接器多语句

javascript - async/await 链何时停止?