我想在我的 Django 测试期间同步调用 celery 任务,而不需要运行 celery worker。为此,我在 settings.py 中指定了 CELERY_ALWAYS_EAGER=True
但它似乎不起作用。所以我决定将 override_settings 装饰器应用于如下所示的特定测试
@override_settings(CELERY_ALWAYS_EAGER=True, BROKER_BACKEND='memory',
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True)
def test_foo(self):
...
不幸的是,这个测试仍然在我的 celery worker 中调用任务。我可以缺少什么?具体来说,我使用的是 Django 1.10 和 Celery 4.0.0。
最佳答案
在celery 4.0配置参数已经改变,
在你的测试中试试这些,
@override_settings(
task_eager_propagates=True,
task_always_eager=True,
broker_url='memory://',
backend='memory'
)
我遇到了同样的问题,解决了使用新的小写名称进行测试以及默认的 celery 设置。
这是原始设置映射的新设置,
http://docs.celeryproject.org/en/latest/userguide/configuration.html#new-lowercase-settings
celery 设置更改信息:
http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names
关于python - 无法在 Django 测试中同步调用 celery 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40722411/