python - 如何对运行 celery 任务的代码进行单元测试?

标签 python unit-testing celery pyramid pytest

我正在开发的应用是高度异步的。 Web 应用程序根据用户操作通过 celery 运行许多任务。 celery 任务本身能够启动更多任务。

如下所示的代码在我们的代码库中经常出现。

def do_sth():
    logic();
    if condition:
         function1.apply_async(*args)
    else:
         function2.apply_asynch(*args)

现在我们想开始对我们编写的任何新代码进行单元测试,但我们不确定如何进行。我们想在我们的 pytest 单元测试中断言的是我们想看看 function1 是否真的被调用了。我们不一定要运行 function1 本身,因为我们将对 function1 进行单元测试。

我不想将 celery 作为进程运行,我也不想在单元测试 session 期间运行任何 AMQP 代理。

这是可以实现的吗?

编辑

有人指出这是How do you unit test a Celery task?的副本

事实并非如此。想想看。我想问的是如何通过 apply_async 测试函数是否调用了 function1。这个问题是关于我如何测试 function1 本身。有很大的不同。在构建这个问题之前,我确实遇到了那个问题。

最佳答案

看看 unittest.mock允许用看起来与原始函数非常相似的“模拟”替换您不想调用的函数的模块,以说服调用代码它正在调用“真实的东西”。然后您可以检查模拟是否实际被调用以及使用了哪些参数。示例:

from unittest.mock import patch

def test_do_sth():
    with patch('function1.apply_async') as function1_mock:
        do_sth()
        assert function1_mock.called

关于python - 如何对运行 celery 任务的代码进行单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31739246/

相关文章:

python - 我们如何通过程序检查任何电子邮件ID是否可发送(是否存在)?

python - SQLite3 数据库如何合并?

django - Django 和 Celery 示例 : Periodic Tasks

python - 启动 celery worker 时出现 ValueError : not enough values to unpack (expected 3, 为 0)

javascript - 为什么需要使用 jasmine 的 runs 和 waitFor 函数?

python - 使用 celery 提交任务的最快方法?

python - Reportlab 表拆分

python - 允许多个可变参数列表?

JavaScript 单元测试和持续集成 2011

node.js - 模拟 Node.js 中测试的特定读取文件错误