python - 每个 celery worker 的定期任务

标签 python celery

我正在寻找一种方法来对每个 celery worker 执行特定类型的任务。确切的用例是定期运行状况作业,确保成功任务初始化和进展的各种先决条件(这些指标报告给不同的服务)。例如,确保可以建立与数据库的连接。

我发现远程控制和检查命令可用于此目的(通过一些固定的调度),但当 AWS SQS 用作后端代理时,它们不受支持。

知道如何在不向 fork 流程任务添加任何内存占用的情况下实现这一目标吗?也许通过在工作进程中启动另一个线程?

最佳答案

为了解决这个问题,我使用了 Celery custom worker bootstep 。它在启动时注册一个计划任务,每 X 秒将一个健康检查任务忽略到工作执行池中。

该解决方案与后端代理无关,并利用定制的工作线程执行池。

class WorkerHealthMonitor(bootsteps.StartStopStep):
    requires = {'celery.worker.components:Timer',
                'celery.worker.components:Pool'}

    def __init__(self, worker, **kwargs):
        self.tref = None
        self.interval = 60

    def start(self, worker):
        logger.info("Registering health monitor timer with %d seconds interval", self.interval)
        self.tref = worker.timer.call_repeatedly(
            self.interval, schedule_health_check, (worker,), priority=10,
        )

    def stop(self, worker):
        if self.tref:
            self.tref.cancel()
            self.tref = None


def schedule_health_check(worker):
    worker.pool.apply_async(health_check, callback=health_check_completed)


def health_check(**kwargs):
    logger.info('Running Health Check...')
    return 'I am alive'


def health_check_completed(result):
    logger.info('Health check completed with msg: %s', result)

任务注册:

app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL)
app.steps['worker'].add(WorkerHealthMonitor)

关于python - 每个 celery worker 的定期任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57346058/

相关文章:

python - Celery:组中的一个子任务总是超时

python - PyDev 无法识别 Celery 任务装饰器

python-3.x - 得到 celery.exceptions.NotRegistered 但 celery -A 应用程序检查已注册 显示我的任务已注册

python - 使用 tmpfs 改进 openCV VideoWriter

python - 如何获得 Gnome Wayland 上的事件窗口?

python - 按 id 上的排名透视 Pandas 数据框

Python - 任何将相对路径转换为绝对路径的方法?

Celery AsyncResult 获取主机名

python - ACCESS_REFUSED - 使用身份验证机制 AMQPLAIN 拒绝登录。有关详细信息,请参阅代理日志文件

python - 使用 ctypes 将 (uint8) NumPy 数组从 python 传递到 c++