这不是问题,而是对那些发现 celery 4.0.1 文档中描述的周期性任务声明很难集成到 django 中的人有所帮助: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries
复制粘贴celery配置文件main_app/celery.py
:
from celery import Celery
from celery.schedules import crontab
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
问题
但是如果我们使用 django 并且我们的任务放在另一个应用程序中怎么办?使用 celery 4.0.1
我们不再有 @periodic_task
装饰器。那么让我们看看我们能做什么。
第一个案例
如果您希望让任务和它们的日程彼此接近:
main_app/some_app/tasks.py
from main_app.celery import app as celery_app
@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'))
@celery_app.task
def test(arg):
print(arg)
我们可以在 Debug模式下运行beat
:
celery -A main_app beat -l debug
我们会看到没有这样的周期性任务。
第二种情况
我们可以尝试像这样在配置文件中描述所有周期性任务:
main_app/celery.py
...
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
from main_app.some_app.tasks import test
sender.add_periodic_task(10.0, test.s('hello'))
...
结果是一样的。但它的行为与您通过 pdb 手动调试时看到的不同。在第一个示例中,根本不会触发 setup_periodic_tasks
回调。但在第二个示例中,我们将得到 django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
(此异常不会被打印出来)
最佳答案
对于 django,我们需要使用另一个信号:@celery_app.on_after_finalize.connect
。它可以同时用于:
- 在
app/tasks.py
中声明接近任务的任务计划,因为此信号将在导入所有tasks.py
并且所有可能的接收者都已订阅(首先例)。 - 集中调度声明,因为 django 应用程序已经初始化并准备好导入(第二种情况)
我想我应该写下最终声明:
第一个案例
接近任务的任务调度声明:
main_app/some_app/tasks.py
from main_app.celery import app as celery_app
@celery_app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'))
@celery_app.task
def test(arg):
print(arg)
第二种情况
配置文件 main_app/celery.py
中的集中调度声明:
...
app = Celery()
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
from main_app.some_app.tasks import test
sender.add_periodic_task(10.0, test.s('hello'))
...
关于python - 在 Django 中连接新的 celery 周期性任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41119053/