我有一个用例,我必须处理一些文档,这需要一些时间。因此,我尝试对文档进行批处理并对它们进行多重处理,效果很好,并且按预期在更短的时间内完成。处理文档也有多个阶段,我在所有阶段都单独使用了多重处理。当我触发多个并发请求来进行处理时,在处理了 70 多个请求后,我注意到一些进程没有被终止。
我正在使用 Locust 执行负载测试,其中创建了 5 个用户,等待时间为 4 - 5 秒,每个请求大约需要 3.5 秒,所以我尝试了多处理包和其他各种包装器(pebble、并行执行) 、悲情、并发.futures)。
我基本上做的是,
from multiprocessing import Pool
with Pool(processes=5) as p:
out = p.starmap(do_something, args)
p.close()
p.terminate()
官方文档还说,在执行with
时,池将在执行后关闭。当我停止请求触发时,最后一两个请求停滞不前。我通过在进程之前和之后打印“Started {req_num}”和“Served {req_num}”发现了这一点。在添加 p.close()
和 p.terminate()
之前,我可以看到在停止触发请求后有更多进程正在运行。添加它们后,仅最后触发的进程不会被服务。现在,如果我开始触发请求并在一段时间后再次停止它们,那么最后一个或两个请求将不会得到满足,并且它们的进程会停滞不前。因此,停滞的过程不断累积。
我提到的每个包装器都有不同的关闭池的方式。我也尝试过。就像悲情一样,
p = Pool(processes=5)
out = p.map(do_something, args)
p.join()
p.close()
p.terminate()
对于concurrent.future.ThreadPoolExecutor
,它是p.shutdown()
。在其他所有包装中,我都面临着同样的问题。这里停滞进程的数量比 multiprocessing.Pool
我需要帮助来找到原因或正确的方法。任何帮助将不胜感激!
最佳答案
要正确关闭池,只需调用:
Pool.close() # terminate worker processes when all work already assigned has completed
Pool.join() # wait all processes to terminate
关于python - 如何正确关闭Python进程池?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62581058/