我正在使用 multiprocessing.Pool
和 multiprocessing.Queue
在 python 中实现生产者-消费者模式。消费者是使用 gevent
生成多个任务的预 fork 进程。
这是一个精简版的代码:
import gevent
from Queue import Empty as QueueEmpty
from multiprocessing import Process, Queue, Pool
import signal
import time
# Task queue
queue = Queue()
def init_worker ():
# Ignore signals in worker
signal.signal( signal.SIGTERM, signal.SIG_IGN )
signal.signal( signal.SIGINT, signal.SIG_IGN )
signal.signal( signal.SIGQUIT, signal.SIG_IGN )
# One of the worker task
def worker_task1( ):
while True:
try:
m = queue.get( timeout = 2 )
# Break out if producer says quit
if m == 'QUIT':
print 'TIME TO QUIT'
break
except QueueEmpty:
pass
# Worker
def work( ):
gevent.joinall([
gevent.spawn( worker_task1 ),
])
pool = Pool( 2, init_worker )
for i in xrange( 2 ):
pool.apply_async( work )
try:
while True:
queue.put( 'Some Task' )
time.sleep( 2 )
except KeyboardInterrupt as e:
print 'STOPPING'
# Signal all workers to quit
for i in xrange( 2 ):
queue.put( 'QUIT' )
pool.join()
现在当我尝试退出时,我得到以下状态:
- 父进程正在等待其中一个子进程加入。
- 其中一个 child 处于不复存在状态。已完成,但 parent 正在等待其他 child 完成。
- 其他 child 正在显示:
futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ...
.
那么干净地结束这样一个过程的正确方法是什么?
最佳答案
我发现了问题所在。根据documentation for multiprocessing.Pool.join()
, pool
需要先 close()ed
才能被 join()ed
。在 pool.join()
之前添加 pool.close()
解决了这个问题。
关于python - 在无限循环中停止附加到队列的python多处理 worker 的最干净方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16674485/