我正在构建一个 Web 应用程序来处理大约 60,000 个(并且还在不断增加)大文件,执行一些分析并返回需要用户验证的“最佳猜测”。文件将按类别细化以避免加载每个文件,但我仍然面临可能需要一次处理 1000 多个文件的场景。
这些是大型文件,每个文件最多可能需要 8-9 秒来处理,在 1000 多个文件的情况下,让用户在审查之间等待 8 秒或在手动处理文件时等待 2 小时以上是不切实际的。
为了克服这个问题,我决定使用多处理来生成多个工作程序,每个工作程序将从文件队列中选取文件、处理它们并插入到输出队列中。我有另一种方法,基本上轮询输出队列中的项目,然后在可用时将它们流式传输到客户端。
这很有效,直到队列任意停止返回项目的一部分。我们在我们的环境中将 gevent 与 Django 和 uwsgi 一起使用,我知道在 gevent 的上下文中通过多处理创建子进程会在子进程中产生不需要的事件循环状态。在 fork 之前生成的小绿叶在 child 中复制。因此,我决定使用 gipc协助处理子进程。
我的代码的示例(我无法发布我的实际代码):
import multiprocessing
import gipc
from item import Item
MAX_WORKERS = 10
class ProcessFiles(object):
def __init__(self):
self.input_queue = multiprocessing.Queue()
self.output_queue = multiprocessing.Queue()
self.file_count = 0
def query_for_results(self):
# Query db for records of files to process.
# Return results and set self.file_count equal to
# the number of records returned.
pass
# The subprocess.
def worker(self):
# Chisel away at the input queue until no items remain.
while True:
if self.no_items_remain():
return
item = self.input_queue.get(item)
item.process()
self.output_queue.put(item)
def start(self):
# Get results and store in Queue for processing
results = self.query_for_results()
for result in results:
item = Item(result)
self.input_queue.put(item)
# Spawn workers to process files.
for _ in xrange(MAX_WORKERS):
process = gipc.start_process(self.worker)
# Poll for items to send to client.
return self.get_processed_items()
def get_processed_items(self):
# Wait for the output queue to hold at least 1 item.
# When an item becomes available, yield it to client.
count = 0
while count != self.file_count:
#item = self._get_processed_item()
# Debugging:
try:
item = self.output_queue.get(timeout=1)
except:
print '\tError fetching processed item. Retrying...'
continue
if item:
print 'QUEUE COUNT: {}'.format(self.output_queue.qsize())
count += 1
yield item
yield 'end'
我希望输出显示处理和生成项目后队列的当前计数:
QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
...
QUEUE COUNT: 4
QUEUE COUNT: 3
QUEUE COUNT: 2
QUEUE COUNT: 1
但是,该脚本在失败之前只设法产生了一些项目:
QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
Error fetching processed item. Retrying...
Error fetching processed item. Retrying...
Error fetching processed item. Retrying...
Error fetching processed item. Retrying...
Error fetching processed item. Retrying...
Error fetching processed item. Retrying...
...
我的问题是:到底发生了什么?为什么我不能从队列中get
?我怎样才能退回我期望的元素并避免这种情况?
最佳答案
当您无法获得元素时抛出的实际异常是什么?您正在盲目地捕获所有可能抛出的异常。此外,为什么不使用没有超时的 get
呢?您立即重试,不做任何其他事情。可能只是调用以获取 block ,直到项目准备就绪。
关于这个问题,我认为发生的事情是 gipc
正在关闭与您的队列关联的管道,从而破坏了队列。我希望抛出 OSError
而不是 queue.Empty
。看这个bug report了解详情。
作为替代方案,您可以使用进程池,在任何 gevent
事情发生之前启动池(这意味着您不必担心事件循环问题)。使用 imap_unordered
将作业提交到池中你应该没事的。
你的启动函数看起来像这样:
def start(self):
results = self.query_for_results()
return self.pool.imap_unordered(self.worker, results,
chunksize=len(results) // self.num_procs_in_pool)
@staticmethod
def worker(item):
item.process()
return item
关于python - 无法从 multiprocessing.Queue .get(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27384647/