python - 在 asyncio 中测试一个永远运行的任务

标签 python python-3.x pytest python-asyncio

我需要每秒(比如说)调用一个任务来轮询硬件上的一些传感器数据。在单元测试中,我想做的就是检查是否调用了正确的方法,以及是否捕获了错误(例如传感器已爆炸或消失)。

这是一个模仿真实代码的玩具示例:

import pytest
import asyncio
import mock


async def ook(func):
    while True:
        await asyncio.sleep(1)
        func()


@pytest.mark.asyncio
async def test_ook():
    func = mock.Mock()
    await ook(func)
    assert func.called is True

正如预期的那样,运行它会永远阻塞。

如何取消 ook 任务,使单元测试不阻塞?

解决方法是将循环拆分为另一个函数并将其定义为不可测试。我想避免这样做。另请注意,弄乱 func(调用 loop.close() 或类似的)也不起作用,因为它只是为了玩具示例测试可以断言某些东西。

最佳答案

就目前而言,您设计 ook 方法的方式是导致问题的原因。

由于 ook 方法,它总是一个阻塞操作。我假设,因为您使用的是 asyncio,所以您希望 ook 在主线程上是非阻塞的?

如果是这样的话,asyncio 实际上带有一个内置的事件循环,参见 this comment for an example. ,它将在另一个线程上运行一个任务,并为您提供控制该任务的方法。

事件循环的文档/示例是 here

关于python - 在 asyncio 中测试一个永远运行的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45483724/

相关文章:

python - Pytest fixture : setup, 拆解和每个测试之间运行的代码

python - 更新多条输出线

python - pd.style.apply 使用多个条件来设置数据框的样式

python - 为什么不能迭代?

python - 如何在Python中委托(delegate) `__iter__`方法

python - 为什么修补析构函数 (__del__) 对失败的测试不起作用?

python - 如何通过字典进行测试

python - 使用Python将阿拉伯语单词编号与字典中的相应编号进行匹配

python - 如何使用python在unicode中转换像,"a³ a¡ a´a§"这样的字符?

python - 从函数返回 str 和 int