python - Multiprocessing pool.join() 在某些情况下挂起

标签 python multiprocessing producer-consumer pool

我正在尝试使用多处理在Python中创建一个简单的生产者/消费者模式。它可以工作,但它卡在 poll.join() 上。

from multiprocessing import Pool, Queue

que = Queue()


def consume():
    while True:
        element = que.get()
        if element is None:
            print('break')
            break
    print('Consumer closing')


def produce(nr):
    que.put([nr] * 1000000)
    print('Producer {} closing'.format(nr))


def main():
    p = Pool(5)
    p.apply_async(consume)
    p.map(produce, range(5))
    que.put(None)
    print('None')
    p.close()
    p.join()


if __name__ == '__main__':
    main()

示例输出:

~/Python/Examples $ ./multip_prod_cons.py 
Producer 1 closing
Producer 3 closing
Producer 0 closing
Producer 2 closing
Producer 4 closing
None
break
Consumer closing

但是,当我更改一行时,它可以完美地工作:

que.put([nr] * 100)

它在运行 Python 3.4.3 或 Python 2.7.10 的 Linux 系统上可 100% 重现。我错过了什么吗?

最佳答案

这里有很多困惑。您所写的不是生产者/消费者场景,而是滥用另一种通常称为“ worker 池”的模式的困惑。

worker 池模式是生产者/消费者模式的一种应用,其中有一个生产者负责调度工作,而许多消费者则消费它。在这种模式中,Pool 的所有者最终成为生产者,而 worker 将成为消费者。

在您的示例中,您有一个混合解决方案,其中一个工作人员最终成为消费者,其他工作人员充当中间件。整个设计效率非常低,重复了Pool已经提供的大部分逻辑,更重要的是,很容易出错。你最终遭受的是 Deadlock

将对象放入 multiprocessing.Queue 是一个异步操作。仅当 Queue 已满并且您的 Queue 具有无限大小时,它才会阻塞。

这意味着您的product函数会立即返回,因此对p.map的调用不会像您期望的那样阻塞。相反,相关的工作进程会等待,直到实际消息通过 Queue 用作通信 channel 的 Pipe

接下来发生的事情是,当您将 None “消息”放入 Queue 中时,您会提前终止消费者,该消息会在您的 生成的所有列表之前传递 函数 create 正确地通过 Pipe 推送。

您在调用 p.join 后就会注意到这个问题,但实际情况如下。

  • p.join 调用正在等待所有工作进程终止。
  • 工作进程正在等待大列表通过队列管道
  • 由于消费者 worker 早已不在,没有人会排空明显已满的管道

该问题不会显示您的列表是否足够小,可以在您实际将终止消息发送到 consume 函数之前进行遍历。

关于python - Multiprocessing pool.join() 在某些情况下挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35590090/

相关文章:

python - 如何让 Intellij-idea 正确关闭 Flask 开发服务器?

Python Multiprocessing 被 selenium 卡住了

c++ - 使用 multimap 的多线程

scala - Akka/斯卡拉 : Can You Explain What's Going On in this Akka Streams Flow?

python - django 迁移 - 具有多个开发分支的工作流

python - 无法将 CSS 应用于 GtkEntry 小部件 (GTK3/gi/Python) 不工作

python - 带 numpy.convolve 的加权移动平均线

Python multiprocessing.Manager 和 os.fork 产生奇怪的行为

java - java代码是否自动利用多个处理器内核(如果可用)

c# - 引导多个生产者和消费者