我需要每秒(比如说)调用一个任务来轮询硬件上的一些传感器数据。在单元测试中,我想做的就是检查是否调用了正确的方法,以及是否捕获了错误(例如传感器已爆炸或消失)。
这是一个模仿真实代码的玩具示例:
import pytest
import asyncio
import mock
async def ook(func):
while True:
await asyncio.sleep(1)
func()
@pytest.mark.asyncio
async def test_ook():
func = mock.Mock()
await ook(func)
assert func.called is True
正如预期的那样,运行它会永远阻塞。
如何取消 ook
任务,使单元测试不阻塞?
解决方法是将循环拆分为另一个函数并将其定义为不可测试。我想避免这样做。另请注意,弄乱 func
(调用 loop.close()
或类似的)也不起作用,因为它只是为了玩具示例测试可以断言某些东西。
最佳答案
就目前而言,您设计 ook
方法的方式是导致问题的原因。
由于 ook
方法,它总是一个阻塞操作。我假设,因为您使用的是 asyncio
,所以您希望 ook
在主线程上是非阻塞的?
如果是这样的话,asyncio
实际上带有一个内置的事件循环,参见 this comment for an example. ,它将在另一个线程上运行一个任务,并为您提供控制该任务的方法。
事件循环的文档/示例是 here
关于python - 在 asyncio 中测试一个永远运行的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45483724/