python - 特定任务后 celery 关闭 worker

标签 python celery

我正在使用 celery(并发池 = 1),我希望能够在特定任务运行后关闭工作程序。需要注意的是,我想避免 worker 在那之后再接手任何其他任务的可能性。

这是我在大纲中的尝试:

from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.exceptions import WorkerShutdown
from celery.signals import task_postrun

app = Celery()
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
    return x + y

@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
    raise WorkerShutdown()

但是,当我运行 worker 时

celery -A celeryapp  worker --concurrency=1 --pool=solo

并运行任务

add.delay(1,4)

我得到以下信息:

 -------------- celery@sam-APOLLO-2000 v4.0.2 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial 2018-03-18 14:08:37
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x7f596896ce90
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[2018-03-18 14:08:39,892: WARNING/MainProcess] Restoring 1 unacknowledged message(s)

任务重新排队,将在另一个工作人员上再次运行,导致循环。

当我在任务本身中移动 WorkerShutdown 异常时也会发生这种情况。

@app.task
def add(x, y):
    print(x + y)
    raise WorkerShutdown()

有没有一种方法可以在完成特定任务后关闭工作器,同时避免这种不幸的副作用?

最佳答案

关闭工作器的推荐过程是发送 TERM 信号。这将导致 celery worker 在完成任何当前正在运行的任务后关闭。如果你向 worker 的主进程发送 QUIT 信号,worker 将立即关闭。

然而,celery 文档通常讨论从命令行或通过 systemd/initd 管理 celery,但 celery 还通过 celery.app.control 提供远程工作控制 API。
您可以revoke阻止 worker 执行任务的任务。这应该可以防止您遇到的循环。此外,控制支持shutdown worker 也是这样。

所以我想下面的内容会让你得到你想要的行为。

@app.task(bind=True)
def shutdown(self):
    app.control.revoke(self.id) # prevent this task from being executed again
    app.control.shutdown() # send shutdown signal to all workers

由于目前无法从任务中确认任务,然后继续执行所述任务,这种使用 revoke 的方法规避了这个问题,即使任务再次排队,新 worker 会直接忽略它。

或者,以下内容也会阻止重新交付的任务再次执行...

@app.task(bind=True)
def some_task(self):
    if self.request.delivery_info['redelivered']:
        raise Ignore() # ignore if this task was redelivered
    print('This should only execute on first receipt of task')

另外值得注意的是AsyncResult还有一个 revoke 方法为你调用 self.app.control.revoke

关于python - 特定任务后 celery 关闭 worker ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49348851/

相关文章:

python - 将 OneHotEncoder 应用于 SparkMlib 中的多个分类列

python - Python 中基于十六进制的 FizzBu​​zz

python - 将对象列表转换为 numpy 数组的更快方法

python - Django celery worker 向前端发送实时状态和结果消息

python - 如何在 celery 中将任务从一个队列移动到另一个队列

Python - 将 float 舍入为 2 位数字

python - 如何在不计算边界外数据的情况下使用 SciPy Image 应用统一过滤器?

python - celery 或 beanstalkd 或两者兼而有之?

django - 将 gevent(或 eventlet)和 prefork worker 与 Celery 一起使用

python - 如何在 Python Flask 的 celery 任务中保留请求上下文?