我正在用 Python 开发一个多线程应用程序。特别是,在这个应用程序中,一个线程应该能够生成一个事件,该事件应该通知一个(或多个)线程;接收到事件通知的线程应该中断它们的执行并运行特定的函数。在此服务功能结束时,他们应该返回到生成事件之前他们正在做的事情。
为了做这样的事情,我正在考虑使用某种发布/订阅模块。我找到了一个非常好用的:PyPubSub .你可以找到here一个关于如何使用它的极其简单的示例。
顺便说一下,当我开始使用它时,我意识到它可以满足我的要求,但前提是您只使用进程。如果您有更多线程,它会暂停整个进程(因此,其中的所有线程)以运行特定的例程。这实际上不是我想要的行为。不幸的是,我无法将我的应用程序从多线程更改为多进程。
你知道有什么模块可以帮助我在多线程应用程序中完成我想做的事情吗?谢谢。
最佳答案
除了通过多处理模块外,python 中没有真正的并发,因为 GIL 不是图片的一部分。
您想要执行的操作需要一个事件循环,您可以在其中检查事件队列并根据需要进行调度。 Pypubsub 可能会让你的生活更轻松,但对于你想要的东西来说可能有点矫枉过正(作为 pubsub 的作者,我很乐意这样说 :) 考虑到 mp 模块如何无缝集成多个进程,是否真的有理由不使用它如果并发真的是你所需要的?
您希望一个事件从任何线程转到一个或多个线程这一事实表明您可以使用任何线程都可以发布到的共享发布队列,其中数据指示哪种事件类型和事件数据。此外,每个线程都有一个消息队列:线程发布到共享的发布队列,主进程事件循环检查发布队列并根据需要将事件复制到单独的线程消息队列。每个线程都必须定期检查其队列并处理,删除已处理的事件。每个线程都可以订阅特定事件的主进程。
下面是 3 个相互发送消息的辅助线程的示例:
from multiprocessing import Process, Queue, Lock
from Queue import Empty as QueueEmpty
from random import randint
def log(lock, threadId, msg):
lock.acquire()
print 'Thread', threadId, ':', msg
lock.release()
def auxThread(id, lock, sendQueue, recvQueue, genType):
## Read from the queue
log(lock, id, 'starting')
while True:
# send a message (once in a while!)
if randint(1,10) > 7:
event = dict(type = genType, fromId = id, val = randint(1, 10) )
log(lock, id, 'putting message type "%(type)s" = %(val)s' % event)
sendQueue.put(event)
# block until we get a message:
maxWait = 1 # second
try:
msg = recvQueue.get(False, maxWait)
log(lock, id, 'got message type "%(type)s" = %(val)s from thread %(fromId)s' % msg)
if (msg['val'] == 'DONE'):
break
except QueueEmpty:
pass
log(lock, id, 'done')
def createThread(id, lock, postOffice, genType):
messagesForAux = Queue()
args = (id, lock, postOffice, messagesForAux, genType)
auxProc = Process(target=auxThread, args=args)
auxProc.daemon = True
return dict(q=messagesForAux, p=auxProc, id=id)
def mainThread():
postOffice = Queue() # where all threads post their messages
lock = Lock() # so print can be synchronized
# setup threads:
msgThreads = [
createThread(1, lock, postOffice, 'heartbeat'),
createThread(2, lock, postOffice, 'new_socket'),
createThread(3, lock, postOffice, 'keypress'),
]
# identify which threads listen for which messages
dispatch = dict(
heartbeat = (2,),
keypress = (1,),
new_socket = (3,),
)
# start all threads
for th in msgThreads:
th['p'].start()
# process messages
count = 0
while True:
try:
maxWait = 1 # second
msg = postOffice.get(False, maxWait)
for threadId in dispatch[msg['type']]:
thObj = msgThreads[threadId - 1]
thObj['q'].put(msg)
count += 1
if count > 20:
break
except QueueEmpty:
pass
log(lock, 0, "Main thread sending exit signal to aux threads")
for th in msgThreads:
th['q'].put(dict(type='command', val='DONE', fromId=0))
for th in msgThreads:
th['p'].join()
log(lock, th['id'], 'joined main')
log(lock, 0, "DONE")
if __name__ == '__main__':
mainThread()
你完全正确,这个描述与 pypubsub 功能有相似之处,但你只会使用 pypubsub 的一小部分,我认为你努力的大部分复杂性是两种类型的队列,pypubsub 对那个 pat 没有多大帮助的问题。使用 mp 模块(按照我的示例)让队列系统工作后,您可以引入 pypubsub 并发布/排队其消息,而不是您自己植入事件。
关于Python:线程管理其他线程通知的事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21763775/