django - 为什么 Celery 任务测试结果不一致?

标签 django celery celery-task

我已经为两个 Celery 任务编写了两个简单的集成测试,但是 当我运行它们时,我得到不一致的结果。我可以运行它们一分钟 一个或两个都会通过,然后运行它们几秒钟和一个 否则两者都会失败。为什么这些结果与一个结果不一致 试运行下一个?另外,这些测试实际上是在测试 Celery 任务是否被发送到队列并由工作人员执行?

谢谢!

以下是任务:

# apps/photos/tasks.py
from __future__ import absolute_import
from conf.celeryapp import app

@app.task
def hello():
    return 'Hello world!'

@app.task
def add(x, y):
    return x + y

以下是测试:

# apps/photos/tests/task_tests.py
from django.test import TestCase
from django.test.utils import override_settings
from apps.photos.tasks import hello, add

class TaskTestIT(TestCase):
    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                       CELERY_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')

    def test_hello(self):
        result = hello.delay()
        self.assertTrue(result.successful())
        self.assertEqual(str(result.result), 'Hello world!')

    def test_add(self):
        result = add.delay(1, 1)
        self.assertTrue(result.successful())
        self.assertEquals(result.get(), 2)

我使用以下命令运行测试:

./manage.py test -s

我使用 django-nose 作为我的测试运行器:

# conf/settings/base.py
USE_DJANGO_NOSE = True
if USE_DJANGO_NOSE:
    INSTALLED_APPS += ( 'django_nose', )
    TEST_RUNNER = 'django_nose.NoseTestSuiteRunner'

这是我的 Celery 应用程序和配置文件:

# conf/celeryapp.py
from celery import Celery

app = Celery('celeryapp')
app.config_from_object('conf.celeryconfig')
app.autodiscover_tasks(['apps.photos'])

# conf/celeryconfig.py
from kombu import Queue, Exchange

BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('photos', Exchange('photos'), routing_key='photos'),
    Queue('mail', Exchange('mail'), routing_key='mail'),
)
CELERY_ROUTES = (
    {'apps.photos.tasks.hello': {'queue': 'default', 'routing_key': 'default'}},
    {'apps.photos.tasks.add': {'queue': 'photos', 'routing_key': 'photos'}},
    {'apps.photos.tasks.send_email': {'queue': 'mail', 'routing_key': 'mail'}},
)

最佳答案

Task.delay()不返回任务的实际结果,它返回 AsyncResult对象,其中将包含任务执行时的结果。结果不同是因为有时任务的执行速度比测试开始检查其结果的速度要快,有时则需要更长的时间。这取决于您的系统负载等。

您应该做的是调用result.get()首先等待任务完成执行,只有在这之后你才应该检查它是否是 .successful() 等。

例如,以下内容应产生一致的结果:

def test_hello(self):
        result = hello.delay()
        result.get()
        self.assertTrue(result.successful())
        self.assertEqual(str(result.result), 'Hello world!')

关于django - 为什么 Celery 任务测试结果不一致?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30035155/

相关文章:

Django celery 和 celery-beat 守护脚本错误

python - Celery 将命令行参数传递给 Task

django - MultipleObjectsReturned at/contact-agent/get() 返回了多个属性 - 它返回了 2

来自相关模型的 django 管理显示字段

django - 如何在 Django 中获取查询集中的倒数第二条记录?

python - 使用 URL 中的字段在 Django Rest Framework 的 ListAPIView 中进行过滤

python - Celery 连接错误

python - CANT_REREAD : Format string in Celery . %h

docker - Celery beat + redis with password抛出No Auth异常

python - 使用 pytest 同时运行 celery 任务