如何编写使用两个队列的 Python 多进程脚本?:
- 一个作为工作队列,从一些数据开始,根据要并行化的功能的条件,即时接收更多任务,
- 另一个收集结果并用于在处理完成后记下结果。
我基本上需要根据我在初始项目中找到的内容,将更多任务放入工作队列中。我在下面发布的示例很愚蠢(我可以根据需要转换项目并将其直接放入输出队列中),但它的机制很清楚并且反射(reflect)了我需要开发的部分概念。
在此我的尝试:
import multiprocessing as mp
def worker(working_queue, output_queue):
item = working_queue.get() #I take an item from the working queue
if item % 2 == 0:
output_queue.put(item**2) # If I like it, I do something with it and conserve the result.
else:
working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue
if __name__ == '__main__':
static_input = range(100)
working_q = mp.Queue()
output_q = mp.Queue()
for i in static_input:
working_q.put(i)
processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?).
for proc in processes:
proc.start()
for proc in processes:
proc.join()
for result in iter(output_q.get, None):
print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible.
这不会结束也不会打印任何结果。
在整个过程结束时,我想确保工作队列为空,并且所有并行函数都已完成写入输出队列,然后再迭代输出结果。您对如何让它发挥作用有什么建议吗?
最佳答案
下面的代码达到了预期的效果。它遵循@tawmas 提出的建议。
此代码允许在一个进程中使用多个核心,该进程要求向工作人员提供数据的队列可以在处理期间由他们更新:
import multiprocessing as mp
def worker(working_queue, output_queue):
while True:
if working_queue.empty() == True:
break #this is the so-called 'poison pill'
else:
picked = working_queue.get()
if picked % 2 == 0:
output_queue.put(picked)
else:
working_queue.put(picked+1)
return
if __name__ == '__main__':
static_input = xrange(100)
working_q = mp.Queue()
output_q = mp.Queue()
results_bank = []
for i in static_input:
working_q.put(i)
processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
results_bank = []
while True:
if output_q.empty() == True:
break
results_bank.append(output_q.get_nowait())
print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
results_bank.sort()
print results_bank
关于具有更新队列和输出队列的 Python 多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21609595/