python - 如何使用 multiprocessing.Queue.get 方法?

标签 python queue multiprocessing python-multiprocessing

下面的代码将三个数字放入一个队列中。然后它尝试从队列中取回数字。但它从来没有。如何从队列中获取数据?

import multiprocessing

queue = multiprocessing.Queue()

for i in range(3):
    queue.put(i)

while not queue.empty():
    print queue.get()

最佳答案

我最初在阅读@Martijn Pieters 后删除了这个答案,因为他更早更详细地描述了“为什么这不起作用”。然后 我意识到,OP 示例中的用例不太符合

的规范冠冕堂皇的标题

"How to use multiprocessing.Queue.get method".

那不是因为有 没有子进程参与演示,但因为在实际应用程序中几乎不会预先填充队列并且只在之后读取,但读取 写作与中间的等待时间交织在一起。 Martijn 展示的扩展演示代码在通常情况下不起作用,因为当排队跟不上读取时,while 循环会很快中断。所以这是重新加载的答案,它能够处理通常的交错提要和读取场景:


不要依赖 queue.empty 检查同步。

After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty. ...

empty()

Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable. docs

要么使用 for msg in iter(queue.get, sentinel): 到队列中的 .get(),在这里你通过传递一个哨兵值... iter(callable, sentinel)?

from multiprocessing import Queue

SENTINEL = None

if __name__ == '__main__':

    queue = Queue()

    for i in [*range(3), SENTINEL]:
        queue.put(i)

    for msg in iter(queue.get, SENTINEL):
        print(msg)

...或使用 get_nowait() 并处理可能的 queue.Empty 异常,如果您需要非阻塞解决方案。

from multiprocessing import Queue
from queue import Empty
import time

SENTINEL = None

if __name__ == '__main__':

    queue = Queue()

    for i in [*range(3), SENTINEL]:
        queue.put(i)

    while True:
        try:
            msg = queue.get_nowait()
            if msg == SENTINEL:
                break
            print(msg)
        except Empty:
            # do other stuff
            time.sleep(0.1)

如果只有一个进程并且该进程中只有一个线程正在读取队列,也可以将最后一个代码片段交换为:

while True:
    if not queue.empty():  # this is not an atomic operation ...
        msg = queue.get()  # ... thread could be interrupted in between
        if msg == SENTINEL:
            break
        print(msg)
    else:
        # do other stuff
        time.sleep(0.1)

因为线程可能会丢弃 GIL在检查 if not queue.empty()queue.get() 之间,这不适合进程中的多线程队列读取。如果多个进程正在从队列中读取,这同样适用。

不过,对于单一生产者/单一消费者场景,使用 multiprocessing.Pipe 代替 multiprocessing.Queue 就足够了,而且性能更高。

关于python - 如何使用 multiprocessing.Queue.get 方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53132498/

相关文章:

Cassandra 队列反模式解决方案

python - 多处理池映射 : AttributeError: Can't pickle local object

python - 在 wxpython 中模拟棋盘

python - Pandas 中具有多个 IF 条件的 For 循环

python - 即使已安装Ansible也无法导入docker-py(Ansible 2.3.0.0)

Python 语言对管道的支持

python - 在 python 的多处理中检查空队列

python - 在 python 中使用 pydoop 保存 gzip 文件

父导出上的 Python 多处理队列

java - 带水印的阻塞队列