我正在开发的应用是高度异步的。 Web 应用程序根据用户操作通过 celery 运行许多任务。 celery 任务本身能够启动更多任务。
如下所示的代码在我们的代码库中经常出现。
def do_sth():
logic();
if condition:
function1.apply_async(*args)
else:
function2.apply_asynch(*args)
现在我们想开始对我们编写的任何新代码进行单元测试,但我们不确定如何进行。我们想在我们的 pytest
单元测试中断言的是我们想看看 function1 是否真的被调用了。我们不一定要运行 function1
本身,因为我们将对 function1
进行单元测试。
我不想将 celery 作为进程运行,我也不想在单元测试 session 期间运行任何 AMQP 代理。
这是可以实现的吗?
编辑
有人指出这是How do you unit test a Celery task?的副本
事实并非如此。想想看。我想问的是如何通过 apply_async 测试函数是否调用了 function1。这个问题是关于我如何测试 function1 本身。有很大的不同。在构建这个问题之前,我确实遇到了那个问题。
最佳答案
看看 unittest.mock
允许用看起来与原始函数非常相似的“模拟”替换您不想调用的函数的模块,以说服调用代码它正在调用“真实的东西”。然后您可以检查模拟是否实际被调用以及使用了哪些参数。示例:
from unittest.mock import patch
def test_do_sth():
with patch('function1.apply_async') as function1_mock:
do_sth()
assert function1_mock.called
关于python - 如何对运行 celery 任务的代码进行单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31739246/