python - celery 结果,不起作用

标签 python rabbitmq celery

我正在使用 2 个 celery 实例。

第一个实例的配置为:

app = Celery('tasks', broker='amqp://guest@localhost//')

app.conf.update(
    CELERY_RESULT_BACKEND='amqp',
    CELERY_TASK_RESULT_EXPIRES=18000,
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TASK_SERIALIZER='json',

    CELERY_ROUTES={
        'task_polling': {
            'queue': 'task_polling_queue'
        },
        'save_shell_task': {
            'queue': 'save_shell_task_queue'
        },
        'save_validation_chain_task': {
            'queue': 'save_validation_chain_task_queue'
        },
        'do_work': {
            'queue': 'do_work_queue'
        },
        'send_mail': {
            'queue': 'send_mail_queue'
        }
    },
)

@shared_task(name='do_work', ignore_result=True)
def do_work(_serialized_task):
    for bla in blala:
        do_something()  
        is_canceled = send_task('save_validation_chain_task', [], 
                                {'_params': my_params}).get() == True

使用以下命令启动:

celery -A tasks worker --loglevel=info -Q do_work_queue,send_mail_queue

第二个:

app = Celery()

app.conf.update(
    CELERY_RESULT_BACKEND='amqp',
    CELERY_TASK_RESULT_EXPIRES=18000,
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TASK_SERIALIZER ='json',
    CELERYBEAT_SCHEDULE={
        'periodic_task': {
            'task': 'task_polling',
            'schedule': timedelta(seconds=1),
        },
    },
    CELERY_ROUTES={
        'task_polling': {
            'queue': 'task_polling_queue'
        },
        'save_shell_task': {
            'queue': 'save_shell_task_queue'
        },
        'save_validation_chain_task': {
            'queue': 'save_validation_chain_task_queue'
        },
        'do_work': {
            'queue': 'do_work_queue'
        },
        'send_mail': {
            'queue': 'send_mail_queue'
        }
    },
)


@shared_task(name='save_shell_task', ignore_result=True)
def save_shell_task(_result):
    ShellUpdate(_json_result=_result).to_db()


@shared_task(name='save_validation_chain_task', ignore_result=False)
def save_validation_chain_task(_result):
    return ValidationChainUpdate(_json_result=_result).to_db()

此版本的启动方式为:

celery -A my_prog worker -B --concurrency=1 -P processes -Q task_polling_queue,save_shell_task_queue,save_validation_chain_task_queue

问题是 send_task(...).get() 未收到结果。程序正在循环等待。

celery 似乎没有收到队列结果或者没有等待正确的队列结果。该问题肯定是由 -Q 参数引起的。您知道配置中可能存在问题吗?

谢谢

编辑: 总体想法是拥有两个具有不同源代码的 celery 实例。这就是为什么我决定枚举队列以消除依赖性。我真的认为这就是为什么结果没有被消耗的原因,因为我无法在命令中指定队列结果,因为这个结果对于每个结果都有一个变量名称(由 celery 动态为每个结果创建的队列)。任何为 celery 实例保留两个不同源代码的解决方案对我来说都是有好处的。我想避免使用另一个结果后端,因为体积非常低。

最佳答案

您的设置和配置正确。唯一的问题是您为 do_work 任务设置了 ignore_result

@shared_task(name='do_work', ignore_result=True)

设置此项后,即使您的任务已由工作人员完成,任务的状态也将始终为PENDING。这就是为什么当您对该任务执行 .get() 时,它永远不会完成该语句的执行。

因为您只接受 json

CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER ='json',

您还需要设置

CELERY_RESULT_SERIALIZER = 'json',

在你的两个配置中。

注意:

就您而言,您正在另一个任务内的一个任务上执行 .get() 。应该避免这种情况。现在它会工作得很好。来自 celery 3.2 it will raise and error instead of warining .

您可以使用chain以防止启动同步子任务(如果它们符合您的需要)。当调用 chain 时,它返回具有 Parent 属性的 async_result 对象。例如

task1 = add.s(1,2)
task2 = add.s(5)
task3 = add.s(10)

result = chain(task1 | task2 | task3)()

result.revoke(terminate=True)                 # revokes task3
result.parent.revoke(terminate=True)          # revokes task2
result.parent.parent.revoke(terminate=True)   # revokes task1

如果它们不适合,您可以使用信号来调用一些其他任务/函数。这是一个简单的例子(我没有测试过这段代码)。

from celery.signals import task_success

@app.task
def small_task():
      print('small task completed')


@app.task
@task_success.connect(sender=small_task)
def big_task(**kwargs):
      print('called by small_task. LOL {0}'.format(kwargs))

关于python - celery 结果,不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26934522/

相关文章:

Python 全局函数,如 'print'

python - celery |发现 Flask 错误 : expected a bytes-like object, AsyncResult

python - 关闭 Celery 任务

python - 如何在运行时撤销任务

python - 解析系统日志并仅匹配最近 x 分钟内的条目

python - 使用css选择器使用scrapy在Reactjs页面上抓取嵌套标签

python - argparse 更新参数的选择

c# - Rabbitmq 中的多生产者、多消费者和单队列

rabbitmq - 每次发布后我应该关闭 channel /连接吗?

rabbitmq - 从rabbitmq断开连接的方法