Python:来自多个线程(或信号)的 Queue.get()

标签 python multithreading queue signals


如何在 Python 中从多个线程使用Queue.get()

我想做的是:一个线程用Queue.put(xxx)发送一个数据,并且某些线程得到相同的数据。 这个想法就像“信号”。我想在没有 PyQt 的情况下完成此操作。

例如:

#!/usr/bin/python
import threading
import Queue

queue= Queue.Queue()

def Func1():
  while True:
    data= queue.get()
    print 'Func1:got',data
    if data=='q':  break

def Func2():
  while True:
    data= queue.get()
    print 'Func2:got',data
    if data=='q':  break

def MainThread():
  while True:
    data= raw_input('q to quit > ')
    queue.put(data)
    if data=='q':  break

t1= threading.Thread(name='func1', target=Func1)
t2= threading.Thread(name='func2', target=Func2)
tm= threading.Thread(name='main', target=MainThread)
t1.start()
t2.start()
tm.start()

t1.join()
t2.join()
tm.join()

这里我期望Func1和Func2从MainThread获取相同的数据,但是Func1和Func2只有一个可以获取数据。

如果您有好的想法,请告诉我。

非常感谢!


编辑于美国东部时间 2014 年 12 月 19 日 12:51

基于Reut Sharabani的想法,我实现了一个信号类。

#!/usr/bin/python
import threading
import Queue

class TSignal:
  def __init__(self):
    self.queues= {}  #Map from index to queue
    self.counter= 0
    self.locker= threading.Lock()
  def NewQueue(self):
    with self.locker:
      idx= self.counter
      self.counter+= 1
      self.queues[idx]= Queue.Queue()
    queue= self.TQueue(self,idx,self.queues[idx])
    return queue
  def DeleteQueue(self,idx):
    with self.locker:
      del self.queues[idx]
  def put(self,item,block=True,timeout=None):
    for idx,queue in self.queues.iteritems():
      queue.put(item,block,timeout)
  class TQueue:
    def __init__(self,parent,idx,queue):
      self.parent= parent
      self.idx= idx
      self.queue= queue
    def __enter__(self):
      return self
    def __exit__(self,e_type,e_value,e_traceback):
      self.parent.DeleteQueue(self.idx)
    def get(self,block=True,timeout=None):
      return self.queue.get(block,timeout)

signal= TSignal()

def Func1():
  with signal.NewQueue() as queue:
    while True:
      data= queue.get()
      print '\nFunc1:got[%r]\n'%data
      if data=='q':  break

def Func2():
  with signal.NewQueue() as queue:
    while True:
      data= queue.get()
      print '\nFunc2:got[%r]\n'%data
      if data=='q':  break

def MainThread():
  while True:
    data= raw_input('q to quit > ')
    signal.put(data)
    if data=='q':  break

t1= threading.Thread(name='func1', target=Func1)
t2= threading.Thread(name='func2', target=Func2)
tm= threading.Thread(name='main', target=MainThread)
t1.start()
t2.start()
tm.start()

t1.join()
t2.join()
tm.join()

TSignal的使用非常简单。在 getter 函数中,插入一个 with 语句,例如:

with signal.NewQueue() as queue:

然后以与Queue.get()相同的方式使用队列:

data= queue.get()

在 putter 函数中,只需使用 put 即可,与 Queue.put() 相同:

signal.put(data)

问题在于,如果线程数为N,TSignal需要维护N个队列,而TSignal.put实际上调用了N次Queue.put。所以我还是想知道是否有更好的想法。

对此您有什么看法吗?

最佳答案

每个线程可以使用一个队列吗?如果是这样,您可以简单地使用每个线程自己的队列发布到它:

#!/usr/bin/python
import threading
import Queue

queue1 = Queue.Queue()
queue2 = Queue.Queue()


def func1():
    while True:
        data = queue1.get()
        print 'Func1:got', data
        if data == 'q':
            break


def func2():
    while True:
        data = queue2.get()
        print 'Func2:got', data
        if data == 'q':
            break


def main():
    while True:
        data = raw_input('q to quit > ')
        queue1.put(data)
        queue2.put(data)
        if data == 'q':
            break


t1 = threading.Thread(name='func1', target=func1)
t2 = threading.Thread(name='func2', target=func2)
tm = threading.Thread(name='main', target=main)
t1.start()
t2.start()
tm.start()

t1.join()
t2.join()
tm.join()

编辑:

对于评论中的后续问题,这里有一个具有固定数量同步原语的机制。这个想法是使用函数和消息创建任务并将它们提交给线程池来执行。 (注意: python 3 有 Barriers 如果您选择其他实现,这里可能会很方便):

#!/usr/bin/python
import threading
import Queue
from multiprocessing.pool import ThreadPool

MAX_THREADS = 10

publish_queue = Queue.Queue()
print_lock = threading.Lock()


def sync_print(msg):
    print_lock.acquire()
    print msg
    print_lock.release()

# the manager actually holds a pool of threads
# he gives tasks to. The tasks are the functions you mean
# to execute zipped with the message.
def manager(functions):
    pool = ThreadPool(min(len(functions), MAX_THREADS))
    while True:
        sync_print("Manager waiting for message")
        message = publish_queue.get()
        sync_print("Manager got message %s" % message)
        if message == 'q':
            pool.close()
            pool.terminate()
            break;
        else:
            # create tasks of form: (function, message)
            tasks = zip(functions, [message] * len(functions))
            pool.map(lambda x: x[0](x[1]), tasks)


def func1(data):
    sync_print('%s:got %s' % (threading.current_thread().name, data))


def func2(data):
    sync_print('%s:got %s' % (threading.current_thread().name, data))


def main():
    while True:
        data = raw_input('q to quit > ')
        # wait for all threads to consume
        publish_queue.put(data)
        if data == 'q':
            break

# the functions you want to execute on each message - these were your threads
functions = [
    func1,
    func2
]

main = threading.Thread(name='main', target=main)
manager = threading.Thread(name='manager', target=manager, args=(functions, ))
manager.start()
main.start()

main.join()

希望这适合您的情况,因为它可能会占用大量处理时间。

关于Python:来自多个线程(或信号)的 Queue.get(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27561654/

相关文章:

python - 手动双向 torch.nn.RNN 实现

c - 不使用pthread_detach或pthread_join,不会为其他新创建的线程清理资源吗?

multithreading - python的queue.Queue.put()方法是异步的吗?

java - 降低内循环的时间复杂度 : Find count of elements greater than current element in the first loop and store that in solved array

java - 有很多空对象会耗尽内存吗?如果是这样,是否有使用 ArrayDeque 作为队列的替代方法?

python - 在 tkinter 滚动条中显示文件

python - 将字典转换为 Pandas 中的数据框列

Python Tkinter : Why use Tkinter. W 不是 str "w"

ruby - 在 Ruby 哈希中并行创建键值对的问题

python - 为什么python 2.5中threading.Thread通过阻塞执行来同步操作?