具有更新队列和输出队列的 Python 多处理

标签 python multiprocessing

如何编写使用两个队列的 Python 多进程脚本?:

  1. 一个作为工作队列,从一些数据开始,根据要并行化的功能的条件,即时接收更多任务,
  2. 另一个收集结果并用于在处理完成后记下结果。

我基本上需要根据我在初始项目中找到的内容,将更多任务放入工作队列中。我在下面发布的示例很愚蠢(我可以根据需要转换项目并将其直接放入输出队列中),但它的机制很清楚并且反射(reflect)了我需要开发的部分概念。

在此我的尝试:

import multiprocessing as mp

def worker(working_queue, output_queue):
    item = working_queue.get() #I take an item from the working queue
    if item % 2 == 0:
        output_queue.put(item**2) # If I like it, I do something with it and conserve the result.
    else:
        working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue 

if __name__ == '__main__':
    static_input = range(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?).
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    for result in iter(output_q.get, None):
        print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible.

这不会结束也不会打印任何结果。

在整个过程结束时,我想确保工作队列为空,并且所有并行函数都已完成写入输出队列,然后再迭代输出结果。您对如何让它发挥作用有什么建议吗?

最佳答案

下面的代码达到了预期的效果。它遵循@tawmas 提出的建议。

此代码允许在一个进程中使用多个核心,该进程要求向工作人员提供数据的队列可以在处理期间由他们更新:

import multiprocessing as mp
def worker(working_queue, output_queue):
    while True:
        if working_queue.empty() == True:
            break #this is the so-called 'poison pill'    
        else:
            picked = working_queue.get()
            if picked % 2 == 0: 
                    output_queue.put(picked)
            else:
                working_queue.put(picked+1)
    return

if __name__ == '__main__':
    static_input = xrange(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    results_bank = []
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    results_bank = []
    while True:
       if output_q.empty() == True:
           break
       results_bank.append(output_q.get_nowait())
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
    results_bank.sort()
    print results_bank

关于具有更新队列和输出队列的 Python 多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21609595/

相关文章:

python - 设置数据类型为 Decimal 的 Pandas 数据框列的小数精度

python - 如何在 Django 模板上实现 "back"链接?

python - 递归求幂

python - cx_Oracle : Error 933. ORA-00933 : “SQL command not properly ended” : SQL command error?

Python websockets,订阅多个 channel

c++ - Boost.进程间 : How to call and send data to a function from class that is in another process?

python - 弦乐变奏

python - 具有常量和可迭代参数的函数的多重处理

linux - 是否可以在 shell 脚本中创建非子进程?

Python 多处理 : AttributeError: Can't pickle local object