python - 在 Django 中连接新的 celery 周期性任务

标签 python django celery

这不是问题,而是对那些发现 celery 4.0.1 文档中描述的周期性任务声明很难集成到 django 中的人有所帮助: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries

复制粘贴celery配置文件main_app/celery.py:

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

问题

但是如果我们使用 django 并且我们的任务放在另一个应用程序中怎么办?使用 celery 4.0.1 我们不再有 @periodic_task 装饰器。那么让我们看看我们能做什么。

第一个案例

如果您希望让任务和它们的日程彼此接近:

main_app/some_app/tasks.py

from main_app.celery import app as celery_app

@celery_app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'))

@celery_app.task
def test(arg):
    print(arg)

我们可以在 Debug模式下运行beat:

celery -A main_app beat -l debug

我们会看到没有这样的周期性任务。

第二种情况

我们可以尝试像这样在配置文件中描述所有周期性任务:

main_app/celery.py

...
app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    from main_app.some_app.tasks import test
    sender.add_periodic_task(10.0, test.s('hello'))
...

结果是一样的。但它的行为与您通过 pdb 手动调试时看到的不同。在第一个示例中,根本不会触发 setup_periodic_tasks 回调。但在第二个示例中,我们将得到 django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.(此异常不会被打印出来)

最佳答案

对于 django,我们需要使用另一个信号:@celery_app.on_after_finalize.connect。它可以同时用于:

  • app/tasks.py 中声明接近任务的任务计划,因为此信号将在导入所有 tasks.py 并且所有可能的接收者都已订阅(首先例)。
  • 集中调度声明,因为 django 应用程序已经初始化并准备好导入(第二种情况)

我想我应该写下最终声明:

第一个案例

接近任务的任务调度声明:

main_app/some_app/tasks.py

from main_app.celery import app as celery_app

@celery_app.on_after_finalize.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'))

@celery_app.task
def test(arg):
    print(arg)

第二种情况

配置文件 main_app/celery.py 中的集中调度声明:

...

app = Celery()

@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    from main_app.some_app.tasks import test
    sender.add_periodic_task(10.0, test.s('hello'))
...

关于python - 在 Django 中连接新的 celery 周期性任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41119053/

相关文章:

python:带有 BeautifulSoup 的 Google 搜索抓取工具

python - 延迟加载属性

python - 根据第一个字符将文件移动到相应的文件夹中 (Python)

python - 调试Django时没有显示调试信息 | Django

celery - 监视 celery 队列待处理任务(有或没有花)

Python Streamlit - 在不重新运行整个脚本的情况下过滤 Pandas 数据框

python - Django 中唯一标识多对多关系

python - Django - TemplateDoesNotExist 在

django - DJCelery未将任务结果存储在Django SQLite DB中

python - SQLAlchemy 在 celery 任务中返回陈旧数据