我们有一个在 python 3.2 (Fedora Core 14 64b) 中运行的 Web 服务服务器,但由于新的依赖项(不支持 3.2)而被迫向后移植到 python 2.6.7。有一段代码使用并发 futures,已被重写为使用 multiprocessing.Pool 并行执行几个关键部分。代码现在看起来像这样:
import multiprocessing
def _run_threads(callable_obj, args, threads):
pool = multiprocessing.Pool(processes=threads)
process_list = [pool.apply_async(callable_obj, a) for a in args]
pool.close()
pool.join()
return [x.get() for x in process_list]
对“线程”这个名称的混淆滥用表示歉意。这些是过程。
自从实现这个功能后,我们发现它有时会挂起。当我们最终杀死父(主)进程时,我们得到一个乱码回溯;但有几行似乎很关键:
[snip]
Process PoolWorker-445:
[snip]
File "/usr/lib64/python2.6/multiprocessing/pool.py", line 59, in worker
task = get()
File "/usr/lib64/python2.6/multiprocessing/queues.py", line 352, in get
return recv()
racquire()
[snip]
在我看来,根据现有证据,池中的子进程未能从父进程接收到“关闭”信号,因此它等待工作。 parent 坐着等待 child 关闭。服务器挂起。这种情况不确定地发生,但对于这样一个关键服务器来说太频繁了(每天一次)。
是不是run_threads()函数的编码有问题?这是已知解决方法的已知问题吗?显然,我们将其用于时间关键的处理,因此除非绝对必要,否则我们不希望为顺序执行重新编码。坚持使用 multiprocessing.Pool 的原因之一是可以轻松访问并行运行的操作的返回码。
谢谢
最佳答案
我不确定这个问题的根源在哪里。这绝对是非常有趣的。但是,也许稍加重组就可以解决问题。我认为您不需要在收集结果之前终止池进程,对吗?也许坚持使用池的“规范”方式,as documented , 帮助:
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1) # raises TimeoutError
或者,在您的情况下,在关闭/加入池之前调用 x.get() for x in process_list
。如果问题仍然存在并出现在 get()
期间,我们至少知道它与 close()
无关。
关于python 2.6.7 多处理池无法关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12485322/