我正在测试多队列。我的测试涉及一个带有 getter 和 setter 的简单类,环绕多队列。
当我重新运行时,我的测试在通过和失败之间交替 [编辑:使用 block=False 时会发生这种情况。按照建议将其设置为 True 会挂起程序]。
我如何重写这个以便
(1) 清除程序执行与执行之间的多队列
(2)在.get()方法中读取队列中的所有值?
import multiprocessing
import queue
class MyClass:
def __init__(self):
self.q = multiprocessing.Queue()
self.results = []
def put(self, x):
self.q.put(x)
def get(self):
while True:
try:
self.results.append(self.q.get(block=True))
except queue.Empty:
break
return self.results
@pytest.fixture
def wrapped_queue():
yield MyClass()
def test_multiprocessing_queue(wrapped_queue):
wrapped_queue.put("a")
wrapped_queue.put("b")
result = wrapped_queue.get()
assert result == ["a", "b"]
最佳答案
这不是 pytest 的问题。队列背后发生的事情比你意识到的要多。
如果您设置 block=True
在您的 setter/getter 中,它应该可以解决问题。
关于python - Pytest + 多处理队列不能很好地协同工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52900292/