我有一个函数,它使用 multiprocessing.Pool
并行处理一个数据集中的所有数据。
from multiprocessing import Pool
...
def func():
...
p = Pool(processes=N)
p.map(func, params)
...
但是,我现在想针对 M
个不同的数据集并行运行它。因此,我在上面现有的脚本之上编写了另一个脚本。我再次尝试使用 Pool
来创建 M
个进程(每个进程都会生成一个由 N
进程组成的 Pool
)。但是,我收到一个关于守护进程无法生成子进程的错误(这听起来非常摇滚)。因此,我阅读了一些内容,然后将 Pool
换成了不太了解的 multiprocessing.pool.ThreadPool
。所以看起来像
p = ThreadPool(processes=M)
p.starmap(func, args)
但是,当我运行此程序时,我发现 ThreadPool 一次仅处理一个数据集。那么我应该使用什么才能拥有一个生成 M
个子级的脚本,每个子级生成 N
个子级并并行执行所有操作。
最佳答案
这是 Manager.Queue()
的示例。 (不是我在评论中写的那样的监视器,我的错)。如果您去掉所有键盘中断异常处理并试图使其正常关闭,那么这确实是一个非常简单的程序,但它仍然没有做到这一点。现在您有了一个外部进程池,这些进程又生成一个池来执行任务。队列用于将任务提供给外部池工作人员,外部池工作人员又将任务提供给工作人员。他们处于无限循环中等待队列中的东西到达。
如果您想管理外部池工作人员并告诉他们,您当然也可以在那里添加控制消息(例如,如果外部池工作人员收到“退出”一词,它将关闭其池并很好地退出)做不同的事情。
from multiprocessing import Pool, Process
from time import sleep
from random import randint
from multiprocessing import Manager
import sys
alist = [1, 2, 3, 4, 5, 6, 7]
def worker(a):
try:
print a
sleep(randint(0, 2))
except KeyboardInterrupt:
pass
def outer_pool(iq, n):
_ip = Pool(processes=7)
try:
while True:
y = iq.get()
_param = []
for _ny in alist:
_param.append("%d - %d - %d" % (n, _ny, y))
_ip.map(worker, _param)
except KeyboardInterrupt:
try:
_ip.terminate()
except:
pass
c_queue = Manager().Queue()
o_processes = []
for t in alist:
p = Process(target=outer_pool, args=(c_queue, t))
p.start()
o_processes.append(p)
try:
while True:
a = randint(42,100)
c_queue.put(a)
except KeyboardInterrupt:
for _p in o_processes:
try:
_p.terminate()
except:
pass
sys.exit(0)
关于Python 池 生成池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46348589/