这个使用 multiprocessing
的简单 Python3 程序似乎没有按预期工作。
所有输入进程共享一个输入队列,它们从中使用数据。他们都共享一个输出队列,一旦完成,他们就会在其中写入结果。我发现这个程序卡在进程 join()
上。这是为什么?
#!/usr/bin/env python3
import multiprocessing
def worker_func(in_q, out_q):
print("A worker has started")
w_results = {}
while not in_q.empty():
v = in_q.get()
w_results[v] = v
out_q.put(w_results)
print("A worker has finished")
def main():
# Input queue to share among processes
fpaths = [str(i) for i in range(10000)]
in_q = multiprocessing.Queue()
for fpath in fpaths:
in_q.put(fpath)
# Create processes and start them
N_PROC = 2
out_q = multiprocessing.Queue()
workers = []
for _ in range(N_PROC):
w = multiprocessing.Process(target=worker_func, args=(in_q, out_q,))
w.start()
workers.append(w)
print("Done adding workers")
# Wait for processes to finish
for w in workers:
w.join()
print("Done join of workers")
# Collate worker results
out_results = {}
while not out_q.empty():
out_results.update(out_q.get())
if __name__ == "__main__":
main()
当 N_PROC = 2
时,我从这个程序中得到了这个结果:
$ python3 test.py
Done adding workers
A worker has started
A worker has started
A worker has finished
<---- I do not get "A worker has finished" from second worker
<---- I do not get "Done join of workers"
即使是单个子进程它也不起作用 N_PROC = 1
:
$ python3 test.py
Done adding workers
A worker has started
A worker has finished
<---- I do not get "Done join of workers"
如果我尝试一个较小的输入队列,例如 1000 个项目,一切正常。
我知道一些旧的 StackOverflow 问题说队列有限制。为什么 Python3 文档中没有对此进行记录?
我可以使用什么替代解决方案?我想使用多处理(不是线程),将输入拆分为 N 个进程。一旦他们的共享输入队列为空,我希望每个进程收集其结果(可以是像 dict 这样的大/复杂数据结构)并将其返回给父进程。如何做到这一点?
最佳答案
这是由您的设计引起的典型错误。当 worker 终止时,它们会停止,因为它们无法将所有数据放入 out_q
,从而使您的程序陷入僵局。这与队列下的管道缓冲区的大小有关。
当您使用 multiprocessing.Queue
时,您应该在尝试加入 feeder 进程之前将其清空,以确保 Process
不会停止等待所有进程要放入 Queue
中的对象。因此,在加入进程之前调用 out_q.get
应该可以解决您的问题:您可以使用哨兵模式来检测计算的结束。
#!/usr/bin/env python3
import multiprocessing
from multiprocessing.queues import Empty
def worker_func(in_q, out_q):
print("A worker has started")
w_results = {}
while not in_q.empty():
try:
v = in_q.get(timeout=1)
w_results[v] = v
except Empty:
pass
out_q.put(w_results)
out_q.put(None)
print("A worker has finished")
def main():
# Input queue to share among processes
fpaths = [str(i) for i in range(10000)]
in_q = multiprocessing.Queue()
for fpath in fpaths:
in_q.put(fpath)
# Create processes and start them
N_PROC = 2
out_q = multiprocessing.Queue()
workers = []
for _ in range(N_PROC):
w = multiprocessing.Process(target=worker_func, args=(in_q, out_q,))
w.start()
workers.append(w)
print("Done adding workers")
# Collate worker results
out_results = {}
n_proc_end = 0
while not n_proc_end == N_PROC:
res = out_q.get()
if res is None:
n_proc_end += 1
else:
out_results.update(res)
# Wait for processes to finish
for w in workers:
w.join()
print("Done join of workers")
if __name__ == "__main__":
main()
另外,请注意您的代码中存在竞争条件。队列 in_q
可以在您检查 not in_q.empty()
和 get
之间被清空。您应该使用非阻塞获取来确保您不会死锁,等待空队列。
最后,您正在尝试实现类似于 multiprocessing.Pool
的东西,它以更稳健的方式处理这种通信。你也可以看看 concurrent.futures
API,它更强大,在某种意义上,设计得更好。
关于python - 为什么 Python 处理完成后不加入输入和输出队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45345136/