python - 为什么 Python 处理完成后不加入输入和输出队列?

标签 python queue multiprocessing

这个使用 multiprocessing 的简单 Python3 程序似乎没有按预期工作。

所有输入进程共享一个输入队列,它们从中使用数据。他们都共享一个输出队列,一旦完成,他们就会在其中写入结果。我发现这个程序卡在进程 join() 上。这是为什么?

#!/usr/bin/env python3

import multiprocessing

def worker_func(in_q, out_q):
    print("A worker has started")    
    w_results = {}
    while not in_q.empty():
        v = in_q.get()
        w_results[v] = v
    out_q.put(w_results)
    print("A worker has finished")

def main():

    # Input queue to share among processes
    fpaths = [str(i) for i in range(10000)]
    in_q = multiprocessing.Queue()
    for fpath in fpaths:
        in_q.put(fpath)

    # Create processes and start them
    N_PROC = 2
    out_q = multiprocessing.Queue()
    workers = []
    for _ in range(N_PROC):
        w = multiprocessing.Process(target=worker_func, args=(in_q, out_q,))
        w.start()
        workers.append(w)
    print("Done adding workers")

    # Wait for processes to finish
    for w in workers:
        w.join()
    print("Done join of workers")

    # Collate worker results
    out_results = {}
    while not out_q.empty():
        out_results.update(out_q.get())

if __name__ == "__main__":
    main()

N_PROC = 2 时,我从这个程序中得到了这个结果:

$ python3 test.py
Done adding workers
A worker has started
A worker has started
A worker has finished
<---- I do not get "A worker has finished" from second worker
<---- I do not get "Done join of workers"

即使是单个子进程它也不起作用 N_PROC = 1:

$ python3 test.py
Done adding workers
A worker has started
A worker has finished
<---- I do not get "Done join of workers"

如果我尝试一个较小的输入队列,例如 1000 个项目,一切正常。

我知道一些旧的 StackOverflow 问题说队列有限制。为什么 Python3 文档中没有对此进行记录?

我可以使用什么替代解决方案?我想使用多处理(不是线程),将输入拆分为 N 个进程。一旦他们的共享输入队列为空,我希望每个进程收集其结果(可以是像 dict 这样的大/复杂数据结构)并将其返回给父进程。如何做到这一点?

最佳答案

这是由您的设计引起的典型错误。当 worker 终止时,它们会停止,因为它们无法将所有数据放入 out_q,从而使您的程序陷入僵局。这与队列下的管道缓冲区的大小有关。

当您使用 multiprocessing.Queue 时,您应该在尝试加入 feeder 进程之前将其清空,以确保 Process 不会停止等待所有进程要放入 Queue 中的对象。因此,在加入进程之前调用 out_q.get 应该可以解决您的问题:您可以使用哨兵模式来检测计算的结束。

#!/usr/bin/env python3

import multiprocessing
from multiprocessing.queues import Empty

def worker_func(in_q, out_q):
    print("A worker has started")    
    w_results = {}
    while not in_q.empty():
        try:
            v = in_q.get(timeout=1)
            w_results[v] = v
        except Empty:
            pass
    out_q.put(w_results)
    out_q.put(None)
    print("A worker has finished")

def main():

    # Input queue to share among processes
    fpaths = [str(i) for i in range(10000)]
    in_q = multiprocessing.Queue()
    for fpath in fpaths:
        in_q.put(fpath)

    # Create processes and start them
    N_PROC = 2
    out_q = multiprocessing.Queue()
    workers = []
    for _ in range(N_PROC):
        w = multiprocessing.Process(target=worker_func, args=(in_q, out_q,))
        w.start()
        workers.append(w)
    print("Done adding workers")

    # Collate worker results
    out_results = {}
    n_proc_end = 0
    while not n_proc_end == N_PROC:
        res = out_q.get()
        if res is None:
            n_proc_end += 1
        else:
            out_results.update(res)

    # Wait for processes to finish
    for w in workers:
        w.join()
    print("Done join of workers")

if __name__ == "__main__":
    main()

另外,请注意您的代码中存在竞争条件。队列 in_q 可以在您检查 not in_q.empty()get 之间被清空。您应该使用非阻塞获取来确保您不会死锁,等待空队列。

最后,您正在尝试实现类似于 multiprocessing.Pool 的东西,它以更稳健的方式处理这种通信。你也可以看看 concurrent.futures API,它更强大,在某种意义上,设计得更好。

关于python - 为什么 Python 处理完成后不加入输入和输出队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45345136/

相关文章:

c - 在新终端中执行子进程

python - Odoo:_get_state() 在 xml View 中至少需要 4 个参数(给定 4 个)

python - 文件中的单行太大?

python - 在 Python 中使用 Phobius 输出从序列中选择区域

添加项目时,带有超时的 Python queue.get(block=true) 不返回

java - 声明 LinkedBlockingQueue<String> 时出现 NullPointer 异常

Python 多处理池;等待迭代完成

Python/PyODBC 通过 IP 与可信连接连接到 SQL Server 2008 DB

c - 为什么在函数内部调用时 malloc 返回空指针?

c 编程实现不同应用程序的互斥