python - 无法从 multiprocessing.Queue .get()

标签 python queue multiprocessing gevent gipc

我正在构建一个 Web 应用程序来处理大约 60,000 个(并且还在不断增加)大文件,执行一些分析并返回需要用户验证的“最佳猜测”。文件将按类别细化以避免加载每个文件,但我仍然面临可能需要一次处理 1000 多个文件的场景。

这些是大型文件,每个文件最多可能需要 8-9 秒来处理,在 1000 多个文件的情况下,让用户在审查之间等待 8 秒或在手动处理文件时等待 2 小时以上是不切实际的。

为了克服这个问题,我决定使用多处理来生成多个工作程序,每个工作程序将从文件队列中选取文件、处理它们并插入到输出队列中。我有另一种方法,基本上轮询输出队列中的项目,然后在可用时将它们流式传输到客户端。

这很有效,直到队列任意停止返回项目的一部分。我们在我们的环境中将 gevent 与 Django 和 uwsgi 一起使用,我知道在 gevent 的上下文中通过多处理创建子进程会在子进程中产生不需要的事件循环状态。在 fork 之前生成的小绿叶在 child 中复制。因此,我决定使用 gipc协助处理子进程。

我的代码的示例(我无法发布我的实际代码):

import multiprocessing
import gipc
from item import Item

MAX_WORKERS = 10

class ProcessFiles(object):

    def __init__(self):
        self.input_queue = multiprocessing.Queue()
        self.output_queue = multiprocessing.Queue()
        self.file_count = 0

    def query_for_results(self):
        # Query db for records of files to process.
        # Return results and set self.file_count equal to
        # the number of records returned.
        pass

    # The subprocess.
    def worker(self):
        # Chisel away at the input queue until no items remain.
        while True:
            if self.no_items_remain():
                return

            item = self.input_queue.get(item)
            item.process()
            self.output_queue.put(item)

    def start(self):
        # Get results and store in Queue for processing
        results = self.query_for_results()
        for result in results:
             item = Item(result)
             self.input_queue.put(item)

        # Spawn workers to process files.
        for _ in xrange(MAX_WORKERS):
            process = gipc.start_process(self.worker)

        # Poll for items to send to client.
        return self.get_processed_items()

    def get_processed_items(self):

        # Wait for the output queue to hold at least 1 item.
        # When an item becomes available, yield it to client.
        count = 0
        while count != self.file_count:
            #item = self._get_processed_item()
            # Debugging:
            try:
                item = self.output_queue.get(timeout=1)
            except:
                print '\tError fetching processed item. Retrying...'
                continue

            if item:
                print 'QUEUE COUNT: {}'.format(self.output_queue.qsize())
                count += 1
                yield item
        yield 'end'

我希望输出显示处理和生成项目后队列的当前计数:

QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
...
QUEUE COUNT: 4
QUEUE COUNT: 3
QUEUE COUNT: 2
QUEUE COUNT: 1

但是,该脚本在失败之前只设法产生了一些项目:

QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    ...

我的问题是:到底发生了什么?为什么我不能从队列中get?我怎样才能退回我期望的元素并避免这种情况?

最佳答案

当您无法获得元素时抛出的实际异常是什么?您正在盲目地捕获所有可能抛出的异常。此外,为什么不使用没有超时的 get 呢?您立即重试,不做任何其他事情。可能只是调用以获取 block ,直到项目准备就绪。

关于这个问题,我认为发生的事情是 gipc 正在关闭与您的队列关联的管道,从而破坏了队列。我希望抛出 OSError 而不是 queue.Empty。看这个bug report了解详情。

作为替代方案,您可以使用进程池,在任何 gevent 事情发生之前启动池(这意味着您不必担心事件循环问题)。使用 imap_unordered 将作业提交到池中你应该没事的。

你的启动函数看起来像这样:

def start(self):
    results = self.query_for_results()
    return self.pool.imap_unordered(self.worker, results, 
        chunksize=len(results) // self.num_procs_in_pool)

@staticmethod
def worker(item):
    item.process()
    return item

关于python - 无法从 multiprocessing.Queue .get(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27384647/

相关文章:

node.js - Node 和 Redis 队列

解释器中的python多处理池断言错误

Python子进程的cpu使用率随机下降到0%,导致进程为 "hang up"

python - Matplotlib:如何为绘图的背景颜色添加效果?

python - 对所有值列求和并根据组在 df 中添加总行

spring - 使用 Spring Batch 或 Quartz 调度程序来调度作业

python - 如何从并行进程中运行的函数中检索值?

python - 快速排序更快地对较大的数字进行排序?

python - 根据另一个数据框 pandas 的值添​​加列

prolog - Prolog 中的差异列表和可变变量