我正在尝试使用以下方法实现多线程生产者-消费者模式 Python 2.7 中的 Queue.Queue。我想弄清楚如何制作 消费者,即工作线程,在完成所有必需的工作后停止。
请参阅 Martin James 对此答案的第二条评论:https://stackoverflow.com/a/19369877/1175080
Send an 'I am finished' task, instructing the pool threads to terminate. Any thread that gets such a task requeues it and then commits suicide.
但这对我不起作用。例如,请参见以下代码。
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send an exit indicator for all threads to consume.
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
这个程序在所有三个工作人员都读取了退出指示器后挂起,
即 -1
来自队列,因为每个工作人员在之前重新排队 -1
退出,因此队列永远不会变空并且 q.join()
永远不会返回。
我提出了以下但丑陋的解决方案,我发送了一个 -1
退出
通过队列为每个工作人员提供指示器,以便每个工作人员都可以看到它
并自杀。但事实上我必须发送一个退出指示器
对于每个 worker 都觉得有点丑。
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send one stop indicator for each worker.
for i in range(3):
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
我有两个问题。
- 为所有线程发送单个退出指示符的方法(如 Martin James 在 https://stackoverflow.com/a/19369877/1175080 的第二条评论中所解释的那样)是否可行?
- 如果对上一个问题的回答是“否”,有没有一种方法可以让我不必为每个工作线程发送单独的退出指示符来解决问题?
最佳答案
不要将其称为任务的特例。
使用 Event相反,为您的工作人员提供非阻塞实现。
stopping = threading.Event()
def worker(n, q, timeout=1):
# run until the master thread indicates we're done
while not stopping.is_set():
try:
# don't block indefinitely so we can return to the top
# of the loop and check the stopping event
data = q.get(True, timeout)
# raised by q.get if we reach the timeout on an empty queue
except queue.Empty:
continue
q.task_done()
def master():
...
print 'waiting for workers to finish'
q.join()
stopping.set()
print 'done'
关于python - 在多线程生产者-消费者模式下,如何让工作线程在工作完成后退出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45169559/