Python 多处理 worker /队列

标签 python multiprocessing

我有一个 python 函数,总共需要运行 12 次。我目前已将其设置为使用多处理库中的 Pool 并行运行所有这些库。通常我一次运行 6 个,因为该函数是 CPU 密集型的,并行运行 12 个通常会导致程序崩溃。当我们一次做 6 次时,第二组 6 次要等到前 6 个过程全部完成后才会开始。理想情况下,我们希望在最初一批 6 个中的一个完成后立即启动另一个(例如第 7 个)- 这样 6 个同时运行,同时还有更多要启动。现在代码看起来像这样(它会被调用两次,传递一个列表中的前 6 个元素,然后传递另一个列表中的第二个 6:

from multiprocessing import Pool

def start_pool(project_list):

    pool = Pool(processes=6)
    pool.map(run_assignments_parallel,project_list[0:6])

所以我一直在尝试实现工作人员/队列解决方案,但遇到了一些问题。我有一个看起来像这样的辅助函数:

def worker(work_queue, done_queue):
    try:
        for proj in iter(work_queue.get, 'STOP'):
            print proj
            run_assignments_parallel(proj)
            done_queue.put('finished ' + proj )
    except Exception, e:        
        done_queue.put("%s failed on %s with: %s" % (current_process().name, proj,        e.message))
    return True

调用worker函数的代码如下:

workers = 6
work_queue = Queue()
done_queue = Queue()  
processes = []
for project in project_list:
    print project
    work_queue.put(project)
for w in xrange(workers):        
    p = Process(target=worker, args=(work_queue, done_queue))
    p.start()
    processes.append(p)
    work_queue.put('STOP')
for p in processes:
     p.join()    
     done_queue.put('STOP')
for status in iter(done_queue.get, 'STOP'):        
    print status

project_list 只是需要在函数“run_assignments_parallel”中运行的 12 个项目的路径列表。

按照现在的写法,同一个进程(项目)会多次调用该函数,我真的不知道发生了什么。这段代码基于我发现的一个例子,我很确定循环结构被搞砸了。任何帮助都会很棒,对于我对此事的无知,我深表歉意。谢谢!

最佳答案

Ideally, we would like another one (e.g. the 7th) to kick off as soon as one from the initial batch of 6 is finished- So that 6 are running at once while there are more to start.

所有您需要更改的是传递所有 12 个输入参数而不是 6 个:

from multiprocessing import Pool
pool = Pool(processes=6) # run no more than 6 at a time
pool.map(run_assignments_parallel, project_list) # pass full list (12 items)

关于Python 多处理 worker /队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22235426/

相关文章:

Python 多处理map_async

linux - execv() 和 fork() 的时间浪费

python - 如何使用 multiprocessing.Queue.get 方法?

python - 对 numpy 元组进行矢量化时函数返回意外值

python - 如何禁用 Flask 中的线程?

python - np.array arr.itemsize 与 sys.getsizeof(arr[0])

python - 多处理计数器输出很奇怪,使用锁

python - django错误:1146, "Table ' basic_project.topics_topic'不存在”

python - Ubuntu Modoboa 从 NGINX 到 APACHE2 : HttpS 403 forbidden while HTTP is correctly accessible. (mod_WSGI)

python - 使用 python 多处理模块创建的子进程不会打印