python - 多重处理 - 生产者/消费者设计

标签 python multiprocessing

我正在使用多处理模块拆分一个非常大的任务。它在大多数情况下都有效,但我的设计中肯定遗漏了一些明显的东西,因为通过这种方式,我很难有效地判断何时处理完所有数据。

我有两个单独的任务在运行;一个喂养另一个。我想这是一个生产者/消费者问题。我在所有进程之间使用共享队列,生产者填满队列,消费者从队列中读取并进行处理。问题是数据量有限,所以在某个时候每个人都需要知道所有数据都已处理,以便系统可以正常关闭。

使用 map_async() 函数似乎是有意义的,但是由于生产者正在填满队列,我不知道前面的所有项目,所以我必须进入 while 循环并使用apply_async() 并尝试检测何时一切都完成了某种超时......丑陋。

我觉得我遗漏了一些明显的东西。如何更好地设计?

制作人

class ProducerProcess(multiprocessing.Process):
    def __init__(self, item, consumer_queue):
        self.item = item
        self.consumer_queue = consumer_queue
        multiprocessing.Process.__init__(self)

    def run(self):
        for record in get_records_for_item(self.item): # this takes time
            self.consumer_queue.put(record)

def start_producer_processes(producer_queue, consumer_queue, max_running):
    running = []

    while not producer_queue.empty():
        running = [r for r in running if r.is_alive()]
        if len(running) < max_running:
            producer_item = producer_queue.get()
            p = ProducerProcess(producer_item, consumer_queue)
            p.start()
            running.append(p)
        time.sleep(1)

消费者

def process_consumer_chunk(queue, chunksize=10000):
    for i in xrange(0, chunksize):
        try:
            # don't wait too long for an item
            # if new records don't arrive in 10 seconds, process what you have
            # and let the next process pick up more items.

            record = queue.get(True, 10)
        except Queue.Empty:                
            break

        do_stuff_with_record(record)

主要

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    consumer_queue = manager.Queue(1024*1024)
    producer_queue = manager.Queue()

    producer_items = xrange(0,10)

    for item in producer_items:
        producer_queue.put(item)

    p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
    p.start()

    consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)

这就是它变得俗气的地方。我不能使用 map ,因为要消耗的列表同时被填满。所以我必须进入 while 循环并尝试检测超时。当生产者仍在尝试填充它时,consumer_queue 可能会变空,所以我不能只检测到一个空队列并在其上退出。

    timed_out = False
    timeout= 1800
    while 1:
        try:
            result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,))
            if timed_out:
                timed_out = False

        except Queue.Empty:
            if timed_out:
                break

            timed_out = True
            time.sleep(timeout)
        time.sleep(1)

    consumer_queue.join()
    consumer_pool.close()
    consumer_pool.join()

我想也许我可以在主线程中获取()记录并将它们传递给消费者而不是传递队列,但我认为我最终遇到了同样的问题。我仍然需要运行一个 while 循环并使用 apply_async() 预先感谢您的任何建议!

最佳答案

您可以使用 manager.Event 来表示工作结束。这个事件可以在你的所有进程之间共享,然后当你从你的主进程发出信号时,其他工作人员可以优雅地关闭。

while not event.is_set():
 ...rest of code...

因此,您的消费者将等待设置事件并在设置后处理清理。

要确定何时设置此标志,您可以在生产者线程上执行 join,当这些都完成后,您可以在消费者线程上加入。

关于python - 多重处理 - 生产者/消费者设计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13980160/

相关文章:

Python - 多处理的奇怪行为 - 连接不执行

python - Macintosh wxPython EVT_TASKBAR_LEFT_UP 或替代

python - 谷歌地图 API 的命中率限制,但不知道为什么

python - 进程完成后如何停止python多处理服务器

python - 并行处理 Airflow 上的百万个文件列表

python - 如何在导入的模块中使用 multiprocessing.Pool?

python - 服务器的非阻塞套接字

python - 如何检查某个连续子数组的总和是否等于 N?

python - Selenium Chrome 获取文本在 headless 模式下不起作用

python - 如果 XHR 收到 abort(),我该如何终止长时间运行的 Django 请求?