python - imap_unordered() 如果 iterable 抛出错误则挂起

标签 python multiprocessing

考虑文件 sample.py 包含以下代码:

from multiprocessing import Pool

def sample_worker(x):
    print "sample_worker processes item", x
    return x

def get_sample_sequence():
    for i in xrange(2,30):
        if i % 10 == 0:
            raise Exception('That sequence is corrupted!')
        yield i

if __name__ == "__main__":
    pool = Pool(24)
    try:
        for x in pool.imap_unordered(sample_worker, get_sample_sequence()):
            print "sample_worker returned value", x
    except:
        print "Outer exception caught!"
    pool.close()
    pool.join()
    print "done"

当我执行它时,我得到以下输出:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner
    self.run()
  File "C:\Python27\lib\threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Python27\lib\multiprocessing\pool.py", line 338, in _handle_tasks
    for i, task in enumerate(taskseq):
  File "C:\Python27\lib\multiprocessing\pool.py", line 278, in <genexpr>
    self._taskqueue.put((((result._job, i, func, (x,), {})
  File "C:\Users\renat-nasyrov\Desktop\sample.py", line 10, in get_sample_sequence
    raise Exception('That sequence is corrupted!')
Exception: That sequence is corrupted!

之后,应用挂起。如何在不挂断的情况下处理这种情况?

最佳答案

正如 septi 所提到的,您的缩进(仍然)是错误的。缩进 yield 语句,使 i 在其范围内。我不完全确定您的代码中实际发生了什么,但是生成超出范围的变量似乎不是一个好主意:

from multiprocessing import Pool

def sample_worker(x):
    print "sample_worker processes item", x
    return x

def get_sample_sequence():
    for i in xrange(2,30):
      if i % 10 == 0:
          raise Exception('That sequence is corrupted!')
      yield i # fixed

if __name__ == "__main__":
    pool = Pool(24)
    try:
        for x in pool.imap_unordered(sample_worker, get_sample_sequence()):
            print "sample_worker returned value", x
    except:
        print "Outer exception caught!"
    pool.close()
    pool.join()
    print "done"

要处理生成器中的异常,您可以使用像这样的包装器:

import logging
def robust_generator():
    try:
        for i in get_sample_sequence():
            logging.debug("yield "+str(i))
            yield i
    except Exception, e:
        logging.exception(e)
        raise StopIteration()

关于python - imap_unordered() 如果 iterable 抛出错误则挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27550558/

相关文章:

python - 在单独的工作人员中更新标签(流程实例)

python - 我什么时候会获取 block 状态设置为 False 的锁?

Python 多处理示例不执行任何操作

python - 如何检测小部件的类型?

Python Tf idf算法

python - 如何使用 python 脚本将参数传递给 linux 命令提示符

c++ - 打开 Mp 嵌套并行

线程可以不立即部署信号吗?

python - 我怎样才能避免: "ZipFile instance has no attribute ' __exit_ _'' "when extracting a zip file?

python - 返回类的新实例的方法