python - 如何使用多处理从输出队列中获取 "batch write"?

标签 python multiprocessing

假设我有以下多处理结构:

import multiprocessing as mp
def worker(working_queue, output_queue):
    while True:
        if working_queue.empty() == True:
            break 
        else:
            picked = working_queue.get()
            res_item = "Number " + str(picked)
            output_queue.put(res_item)
    return

if __name__ == '__main__':
    static_input = xrange(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    results_bank = []
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(2)]
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    results_bank = []
    while True:
       if output_q.empty() == True:
           break
       results_bank.append(output_q.get_nowait())
    if len(results_bank) == len(static_input):
        print "Good run"
    else:
        print "Bad run"

我的问题:当工作队列仍在“工作”(或至少未完成)时,如何将结果“批量”写入单个文件?

注意:我的实际数据结构对相对于输入的无序结果不敏感(尽管我的示例使用整数)。

此外,我认为从输出队列进行批量/集合写入是最佳实践,而不是从不断增长的结果库对象进行写入。然而,我对依赖这两种方法的解决方案持开放态度。我是多重处理新手,因此不确定此问题的最佳实践或最有效的解决方案。

最佳答案

如果您想使用mp.Processmp.Queue,这里有一种批量处理结果的方法。主要思想在 writer 函数中,如下:

import itertools as IT
import multiprocessing as mp
SENTINEL = None
static_len = 100

def worker(working_queue, output_queue):
    for picked in iter(working_queue.get, SENTINEL):
        res_item = "Number {:2d}".format(picked)
        output_queue.put(res_item)

def writer(output_queue, threshold=10):
    result_length = 0
    items = iter(output_queue.get, SENTINEL)
    for batch in iter(lambda: list(IT.islice(items, threshold)), []):
        print('\n'.join(batch))
        result_length += len(batch)
    state = 'Good run' if result_length == static_len else 'Bad run'
    print(state)

if __name__ == '__main__':
    num_workers = 2

    static_input = range(static_len)
    working_q = mp.Queue()
    output_q = mp.Queue()

    writer_proc = mp.Process(target=writer, args=(output_q,))
    writer_proc.start()

    for i in static_input:
        working_q.put(i)

    processes = [mp.Process(target=worker, args=(working_q, output_q)) 
                 for i in range(num_workers)]
    for proc in processes:
        proc.start()
        # Put SENTINELs in the Queue to tell the workers to exit their for-loop
        working_q.put(SENTINEL)
    for proc in processes:
        proc.join()

    output_q.put(SENTINEL)
    writer_proc.join()
<小时/>

当传递两个参数时,iter需要一个可调用的和一个哨兵: iter(可调用,哨兵)。可调用对象(即函数)会被重复调用,直到它返回等于哨兵的值为止。所以

items = iter(output_queue.get, SENTINEL)

items定义为一个可迭代对象,当迭代时,将从output_queue返回项目 直到output_queue.get()返回SENTINEL

for 循环:

for batch in iter(lambda: list(IT.islice(items, threshold)), []):

重复调用 lambda 函数,直到返回空列表。调用时,lambda 函数会返回可迭代 items 中最多 threshold 个项目的列表。因此,这是“按 n 个项目进行分组无填充”的惯用语。请参阅this post有关此习语的更多信息。

<小时/>

请注意,测试 working_q.empty() 并不是一个好的做法。这可能会导致竞争状况。例如,假设当 working_q 中只剩下 1 个项目时,这些行上有 2 个 worker 进程:

def worker(working_queue, output_queue):
    while True:
        if working_queue.empty() == True:        <-- Process-1
            break 
        else:
            picked = working_queue.get()         <-- Process-2
            res_item = "Number " + str(picked)
            output_queue.put(res_item)
    return

假设当队列中仍有一项时,Process-1 调用 working_queue.empty()。所以它返回False。然后Process-2调用working_queue.get()并获取最后一项。然后,Process-1 到达 picked =working_queue.get() 行并挂起,因为队列中没有更多项目。

因此,使用哨兵(如上所示)在 for 循环 时具体发出信号 或 while-loop 应该停止而不是检查 queue.empty()

关于python - 如何使用多处理从输出队列中获取 "batch write"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43074954/

相关文章:

c - 多处理器系统上的线程

python-3.x - Python 中的快速 GUID

python - 加权边缘如何影响networkx中的PageRank?

python - 压缩 GraphQL 查询?

python - pydev/python 中的 "Undefined variable : main"

python multiprocessing.Process 执行了一个错误的目标(用py2exe打包)

Python 多处理 EOF

python - 如何使用 xlwings 清除 Excel 中的筛选器?

python - 两对方位角和高度之间的角度?

python - 使用多处理实现排序的生产者/消费者队列