我正在使用 2 个 celery 实例。
第一个实例的配置为:
app = Celery('tasks', broker='amqp://guest@localhost//')
app.conf.update(
CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=18000,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_ROUTES={
'task_polling': {
'queue': 'task_polling_queue'
},
'save_shell_task': {
'queue': 'save_shell_task_queue'
},
'save_validation_chain_task': {
'queue': 'save_validation_chain_task_queue'
},
'do_work': {
'queue': 'do_work_queue'
},
'send_mail': {
'queue': 'send_mail_queue'
}
},
)
@shared_task(name='do_work', ignore_result=True)
def do_work(_serialized_task):
for bla in blala:
do_something()
is_canceled = send_task('save_validation_chain_task', [],
{'_params': my_params}).get() == True
使用以下命令启动:
celery -A tasks worker --loglevel=info -Q do_work_queue,send_mail_queue
第二个:
app = Celery()
app.conf.update(
CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=18000,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER ='json',
CELERYBEAT_SCHEDULE={
'periodic_task': {
'task': 'task_polling',
'schedule': timedelta(seconds=1),
},
},
CELERY_ROUTES={
'task_polling': {
'queue': 'task_polling_queue'
},
'save_shell_task': {
'queue': 'save_shell_task_queue'
},
'save_validation_chain_task': {
'queue': 'save_validation_chain_task_queue'
},
'do_work': {
'queue': 'do_work_queue'
},
'send_mail': {
'queue': 'send_mail_queue'
}
},
)
@shared_task(name='save_shell_task', ignore_result=True)
def save_shell_task(_result):
ShellUpdate(_json_result=_result).to_db()
@shared_task(name='save_validation_chain_task', ignore_result=False)
def save_validation_chain_task(_result):
return ValidationChainUpdate(_json_result=_result).to_db()
此版本的启动方式为:
celery -A my_prog worker -B --concurrency=1 -P processes -Q task_polling_queue,save_shell_task_queue,save_validation_chain_task_queue
问题是 send_task(...).get()
未收到结果。程序正在循环等待。
celery 似乎没有收到队列结果或者没有等待正确的队列结果。该问题肯定是由 -Q 参数引起的。您知道配置中可能存在问题吗?
谢谢
编辑: 总体想法是拥有两个具有不同源代码的 celery 实例。这就是为什么我决定枚举队列以消除依赖性。我真的认为这就是为什么结果没有被消耗的原因,因为我无法在命令中指定队列结果,因为这个结果对于每个结果都有一个变量名称(由 celery 动态为每个结果创建的队列)。任何为 celery 实例保留两个不同源代码的解决方案对我来说都是有好处的。我想避免使用另一个结果后端,因为体积非常低。
最佳答案
您的设置和配置正确。唯一的问题是您为 do_work
任务设置了 ignore_result
。
@shared_task(name='do_work', ignore_result=True)
设置此项后,即使您的任务已由工作人员完成,任务的状态
也将始终为PENDING
。这就是为什么当您对该任务执行 .get()
时,它永远不会完成该语句的执行。
因为您只接受 json
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER ='json',
您还需要设置
CELERY_RESULT_SERIALIZER = 'json',
在你的两个配置中。
注意:
就您而言,您正在另一个任务内的一个任务上执行 .get()
。应该避免这种情况。现在它会工作得很好。来自 celery 3.2 it will raise and error instead of warining .
您可以使用chain以防止启动同步子任务(如果它们符合您的需要)。当调用 chain 时,它返回具有 Parent 属性的 async_result 对象。例如
task1 = add.s(1,2)
task2 = add.s(5)
task3 = add.s(10)
result = chain(task1 | task2 | task3)()
result.revoke(terminate=True) # revokes task3
result.parent.revoke(terminate=True) # revokes task2
result.parent.parent.revoke(terminate=True) # revokes task1
如果它们不适合,您可以使用信号来调用一些其他任务/函数。这是一个简单的例子(我没有测试过这段代码)。
from celery.signals import task_success
@app.task
def small_task():
print('small task completed')
@app.task
@task_success.connect(sender=small_task)
def big_task(**kwargs):
print('called by small_task. LOL {0}'.format(kwargs))
关于python - celery 结果,不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26934522/