python - 在多线程生产者-消费者模式下,如何让工作线程在工作完成后退出?

标签 python multithreading python-2.7 concurrency producer-consumer

我正在尝试使用以下方法实现多线程生产者-消费者模式 Python 2.7 中的 Queue.Queue。我想弄清楚如何制作 消费者,即工作线程,在完成所有必需的工作后停止。

请参阅 Martin James 对此答案的第二条评论:https://stackoverflow.com/a/19369877/1175080

Send an 'I am finished' task, instructing the pool threads to terminate. Any thread that gets such a task requeues it and then commits suicide.

但这对我不起作用。例如,请参见以下代码。

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

这个程序在所有三个工作人员都读取了退出指示器后挂起, 即 -1 来自队列,因为每个工作人员在之前重新排队 -1 退出,因此队列永远不会变空并且 q.join() 永远不会返回。

我提出了以下但丑陋的解决方案,我发送了一个 -1 退出 通过队列为每个工作人员提供指示器,以便每个工作人员都可以看到它 并自杀。但事实上我必须发送一个退出指示器 对于每个 worker 都觉得有点丑。

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send one stop indicator for each worker.
    for i in range(3):
        q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

我有两个问题。

  1. 为所有线程发送单个退出指示符的方法(如 Martin James 在 https://stackoverflow.com/a/19369877/1175080 的第二条评论中所解释的那样)是否可行?
  2. 如果对上一个问题的回答是“否”,有没有一种方法可以让我不必为每个工作线程发送单独的退出指示符来解决问题?

最佳答案

不要将其称为任务的特例。

使用 Event相反,为您的工作人员提供非阻塞实现。

stopping = threading.Event()

def worker(n, q, timeout=1):
    # run until the master thread indicates we're done
    while not stopping.is_set():
        try:
            # don't block indefinitely so we can return to the top
            # of the loop and check the stopping event
            data = q.get(True, timeout)
        # raised by q.get if we reach the timeout on an empty queue
        except queue.Empty:
            continue
        q.task_done()

def master():
    ...

    print 'waiting for workers to finish'
    q.join()
    stopping.set()
    print 'done'

关于python - 在多线程生产者-消费者模式下,如何让工作线程在工作完成后退出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45169559/

相关文章:

python - 如何使用 pyglet 在 Mayavi 动画中播放声音?

python - Haystack-Django 模板不存在于/search/Error

Python 2.7 变量类型保持为 int32 而不是转换为 long

java - Vert.x 和 Netty 有什么区别?

multithreading - 主线程 “blocked on critical section which is abandoned”

python - Odoo Many 2 Many 选择字段

python : Converting time string with different timezone to host timezone datetime object?

python - 使用 Python 装饰器为方法添加方法

python - Django - 使用给定的 sql 转储来创建其他模型并填充数据

java - java中的Volatile关键字错误