python - 如何正确关闭Python进程池?

标签 python multiprocessing python-multiprocessing concurrent.futures pathos

我有一个用例,我必须处理一些文档,这需要一些时间。因此,我尝试对文档进行批处理并对它们进行多重处理,效果很好,并且按预期在更短的时间内完成。处理文档也有多个阶段,我在所有阶段都单独使用了多重处理。当我触发多个并发请求来进行处理时,在处理了 70 多个请求后,我注意到一些进程没有被终止。

我正在使用 Locust 执行负载测试,其中创建了 5 个用户,等待时间为 4 - 5 秒,每个请求大约需要 3.5 秒,所以我尝试了多处理包和其他各种包装器(pebble、并行执行) 、悲情、并发.futures)。

我基本上做的是,

from multiprocessing import Pool

with Pool(processes=5) as p:
    out = p.starmap(do_something, args)
    p.close()
    p.terminate()

官方文档还说,在执行with时,池将在执行后关闭。当我停止请求触发时,最后一两个请求停滞不前。我通过在进程之前和之后打印“Started {req_num}”和“Served {req_num}”发现了这一点。在添加 p.close()p.terminate() 之前,我可以看到在停止触发请求后有更多进程正在运行。添加它们后,仅最后触发的进程不会被服务。现在,如果我开始触发请求并在一段时间后再次停止它们,那么最后一个或两个请求将不会得到满足,并且它们的进程会停滞不前。因此,停滞的过程不断累积。

我提到的每个包装器都有不同的关闭池的方式。我也尝试过。就像悲情一样,

p = Pool(processes=5)
out = p.map(do_something, args)
p.join()
p.close()
p.terminate()

对于concurrent.future.ThreadPoolExecutor,它是p.shutdown()。在其他所有包装中,我都面临着同样的问题。这里停滞进程的数量比 multiprocessing.Pool

中的要多

我需要帮助来找到原因或正确的方法。任何帮助将不胜感激!

最佳答案

要正确关闭池,只需调用:

Pool.close()  # terminate worker processes when all work already assigned has completed
Pool.join()  # wait all processes to terminate

关于python - 如何正确关闭Python进程池?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62581058/

相关文章:

python - 在 numpy block 上并行运行重循环

python - 多处理不均匀分配作业 - Python

python - 将数据传递给 Python multiprocessing.Pool 工作进程

python - python 3.9 上的 Typing.Optional 是否有更新,或者我做错了什么?

python - 如何借助 Scapy 从文本文件生成 Pcap 流量

python - 基于条件的新列

python - 结合 itertools 和多处理?

python - 多处理程序在 Anaconda 笔记本中有 AttributeError

Python 多处理 imap chunksize

python - 如何通过条件选择从 Pandas DataFrame 返回列列表,其中一行中的所有值都是 True?