Python 池 生成池

标签 python multithreading multiprocessing

我有一个函数,它使用 multiprocessing.Pool 并行处理一个数据集中的所有数据。

from multiprocessing import Pool
...
def func():
    ...
    p = Pool(processes=N)
    p.map(func, params)
    ...

但是,我现在想针对 M 个不同的数据集并行运行它。因此,我在上面现有的脚本之上编写了另一个脚本。我再次尝试使用 Pool 来创建 M 个进程(每个进程都会生成一个由 N 进程组成的 Pool)。但是,我收到一个关于守护进程无法生成子进程的错误(这听起来非常摇滚)。因此,我阅读了一些内容,然后将 Pool 换成了不太了解的 multiprocessing.pool.ThreadPool。所以看起来像

p = ThreadPool(processes=M)
p.starmap(func, args)

但是,当我运行此程序时,我发现 ThreadPool 一次仅处理一个数据集。那么我应该使用什么才能拥有一个生成 M 个子级的脚本,每个子级生成 N 个子级并并行执行所有操作。

最佳答案

这是 Manager.Queue() 的示例。 (不是我在评论中写的那样的监视器,我的错)。如果您去掉所有键盘中断异常处理并试图使其正常关闭,那么这确实是一个非常简单的程序,但它仍然没有做到这一点。现在您有了一个外部进程池,这些进程又生成一个池来执行任务。队列用于将任务提供给外部池工作人员,外部池工作人员又将任务提供给工作人员。他们处于无限循环中等待队列中的东西到达。

如果您想管理外部池工作人员并告诉他们,您当然也可以在那里添加控制消息(例如,如果外部池工作人员收到“退出”一词,它将关闭其池并很好地退出)做不同的事情。

from multiprocessing import Pool, Process
from time import sleep
from random import randint
from multiprocessing import Manager
import sys


alist = [1, 2, 3, 4, 5, 6, 7]


def worker(a):
    try:
        print a
        sleep(randint(0, 2))
    except KeyboardInterrupt:
        pass


def outer_pool(iq, n):
    _ip = Pool(processes=7)
    try:
        while True:
            y = iq.get()
            _param = []
            for _ny in alist:
                _param.append("%d - %d - %d" % (n, _ny, y))
            _ip.map(worker, _param)

    except KeyboardInterrupt:
        try:
            _ip.terminate()
        except:
            pass


c_queue = Manager().Queue()

o_processes = []
for t in alist:
    p = Process(target=outer_pool, args=(c_queue, t))
    p.start()
    o_processes.append(p)

try:
    while True:
        a = randint(42,100)
        c_queue.put(a)
except KeyboardInterrupt:

    for _p in o_processes:
        try:
            _p.terminate()
        except:
            pass
    sys.exit(0)

关于Python 池 生成池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46348589/

相关文章:

当进程数和工作线程数增加时,Python 多处理池 API 无法有效工作

python - 如何使用 Django 将完全渲染的 html 页面转换为 PDF

c++ - 通过两个线程递增的全局变量的最终值

java - 线程的运行没有看到它的数组大小增加

java - 线程状态java

python - multiprocessing.Pool 的 apply_async 中的工作人员是否有办法捕获错误并继续?

python - 具有多个参数的多处理映射

python - 使用 Python 抓取 Google 网页时,总是得不到足够的图像和重复的图像?

python - Pandas,验证数据,检查所有组的长度是否相同

python - Python函数定义中的正斜杠? [复制]