我正在尝试使用多处理
在Python中创建一个简单的生产者/消费者模式。它可以工作,但它卡在 poll.join()
上。
from multiprocessing import Pool, Queue
que = Queue()
def consume():
while True:
element = que.get()
if element is None:
print('break')
break
print('Consumer closing')
def produce(nr):
que.put([nr] * 1000000)
print('Producer {} closing'.format(nr))
def main():
p = Pool(5)
p.apply_async(consume)
p.map(produce, range(5))
que.put(None)
print('None')
p.close()
p.join()
if __name__ == '__main__':
main()
示例输出:
~/Python/Examples $ ./multip_prod_cons.py
Producer 1 closing
Producer 3 closing
Producer 0 closing
Producer 2 closing
Producer 4 closing
None
break
Consumer closing
但是,当我更改一行时,它可以完美地工作:
que.put([nr] * 100)
它在运行 Python 3.4.3 或 Python 2.7.10 的 Linux 系统上可 100% 重现。我错过了什么吗?
最佳答案
这里有很多困惑。您所写的不是生产者/消费者场景,而是滥用另一种通常称为“ worker 池”的模式的困惑。
worker 池模式是生产者/消费者模式的一种应用,其中有一个生产者负责调度工作,而许多消费者则消费它。在这种模式中,Pool
的所有者最终成为生产者,而 worker 将成为消费者。
在您的示例中,您有一个混合解决方案,其中一个工作人员最终成为消费者,其他工作人员充当中间件。整个设计效率非常低,重复了Pool
已经提供的大部分逻辑,更重要的是,很容易出错。你最终遭受的是 Deadlock 。
将对象放入 multiprocessing.Queue
是一个异步操作。仅当 Queue
已满并且您的 Queue
具有无限大小时,它才会阻塞。
这意味着您的product
函数会立即返回,因此对p.map
的调用不会像您期望的那样阻塞。相反,相关的工作进程会等待,直到实际消息通过 Queue
用作通信 channel 的 Pipe
。
接下来发生的事情是,当您将 None
“消息”放入 Queue
中时,您会提前终止消费者,该消息会在您的 生成的所有列表之前传递
函数 create 正确地通过 Pipe
推送。
您在调用 p.join
后就会注意到这个问题,但实际情况如下。
p.join
调用正在等待所有工作进程终止。- 工作进程正在等待大列表通过
队列
的管道
。 - 由于消费者 worker 早已不在,没有人会排空明显已满的
管道
。
该问题不会显示您的列表是否足够小,可以在您实际将终止消息发送到 consume
函数之前进行遍历。
关于python - Multiprocessing pool.join() 在某些情况下挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35590090/