我在使用 pydispatch 模块进行线程间通信时遇到了问题。我使用了此处提供的示例:https://sites.google.com/site/hardwaremonkey/blog/python-howtocommunicatebetweenthreadsusingpydispatch
我稍微修改了它以在日志中提供更详细的信息。特别是,我还让它显示了实际的线程名称:
from pydispatch import dispatcher
import threading
import time
import logging
log_formatter = logging.Formatter('%(asctime)s %(levelname)s [%(name)s] [%(threadName)s] %(message)s', '%H:%M:%S')
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
log_handler = logging.StreamHandler()
log_handler.setLevel(logging.DEBUG)
log_handler.setFormatter(log_formatter)
logger.addHandler(log_handler)
ALICE_SIGNAL='alice_signal'
ALICE_SENDER='alice_sender'
BOB_SIGNAL='bob_signal'
BOB_SENDER='bob_sender'
class Alice():
''' alice procrastinates and replies to bob'''
def __init__(self):
logger.debug('Alice instantiated')
dispatcher.connect(self.alice_dispatcher_receive, signal=BOB_SIGNAL, sender=BOB_SENDER)
self.alice()
def alice_dispatcher_receive(self, message):
''' handle dispatcher'''
logger.debug('Alice has received message: {}'.format(message))
dispatcher.send(message='thank you from Alice', signal=ALICE_SIGNAL, sender=ALICE_SENDER)
def alice(self):
''' loop and wait '''
while True:
logger.debug('Alice is procrastinating')
time.sleep(1)
class Bob():
''' bob contacts alice periodically '''
def __init__(self):
logger.debug('Bob instantiated')
dispatcher.connect(self.bob_dispatcher_receive, signal=ALICE_SIGNAL, sender=ALICE_SENDER)
self.bob()
def bob_dispatcher_receive(self, message):
''' handle dispatcher '''
logger.debug('Bob has received message: {}'.format(message))
def bob(self):
''' loop and send messages using a dispatcher '''
while True:
dispatcher.send(message='message from Bob', signal=BOB_SIGNAL, sender=BOB_SENDER)
time.sleep(3)
if __name__ == '__main__':
logger.debug('Starting...')
alice_thread = threading.Thread(target=Alice, name='Thread-Alice')
alice_thread.start()
bob_thread = threading.Thread(target=Bob, name='Thread-Bob')
bob_thread.start()
这是我发现的:
08:10:43 DEBUG [root] [MainThread] Starting...
08:10:43 DEBUG [root] [Thread-Alice] Alice instantiated
08:10:43 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:43 DEBUG [root] [Thread-Bob] Bob instantiated
08:10:43 DEBUG [root] [Thread-Bob] Alice has received message: message from Bob
08:10:43 DEBUG [root] [Thread-Bob] Bob has received message: thank you from Alice
08:10:44 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:45 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:46 DEBUG [root] [Thread-Bob] Alice has received message: message from Bob
08:10:46 DEBUG [root] [Thread-Bob] Bob has received message: thank you from Alice
08:10:46 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:47 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:48 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:49 DEBUG [root] [Thread-Bob] Alice has received message: message from Bob
08:10:49 DEBUG [root] [Thread-Bob] Bob has received message: thank you from Alice
08:10:49 DEBUG [root] [Thread-Alice] Alice is procrastinating
看这个:
[Thread-Bob] Alice has received message: message from Bob
“Alice has received message”已经在Bob的线程中执行。虽然我预计它会在 Alice 的线程中执行。据我了解,调度程序收到来自 Bob 的信号并直接在同一个线程中调用处理程序。所以它实际上从 Bob 的线程调用了 Alice 的代码,导致意想不到的细微差别。
问题#1。当 Alice 处理信号时,Bob 的执行实际上被阻止了。
问题#2。在较大的应用程序中,Alice 的代码可能会意外地在多个并行线程中执行。
问题#3。一般封装性差。我们希望 Alice 和 Bob 在各自的线程中运行在一个实例中,彼此独立,只交换消息。这里不是这种情况,因为它们实际上调用了彼此的代码。
有没有办法为 pydispatcher 解决这个问题?或者您可以推荐另一个没有这些问题的线程间通信库吗?
最佳答案
找到了使用 event_loop.call_soon_threadsafe() 的解决方案。
现在是代码:
def register_signal_handler(loop, handler, signal, sender):
def dispatcher_receive(message):
loop.call_soon_threadsafe(handler, message)
dispatcher.connect(dispatcher_receive, signal=signal, sender=sender, weak=False)
class Alice():
def __init__(self):
logger.debug('Alice instantiated')
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
register_signal_handler(self.loop, self.alice_receive, signal=BOB_SIGNAL, sender=BOB_SENDER)
self.alice()
def alice_receive(self, message):
logger.debug('Alice has received message: {}'.format(message))
dispatcher.send(message='thank you from Alice', signal=ALICE_SIGNAL, sender=ALICE_SENDER)
def alice(self):
def procrastinate():
logger.debug('Alice is procrastinating')
self.loop.call_later(1, procrastinate)
procrastinate()
self.loop.run_forever()
class Bob():
def __init__(self):
logger.debug('Bob instantiated')
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
register_signal_handler(self.loop, self.bob_receive, signal=ALICE_SIGNAL, sender=ALICE_SENDER)
self.bob()
def bob_receive(self, message):
logger.debug('Bob has received message: {}'.format(message))
def bob(self):
def poke_alice():
dispatcher.send(message='message from Bob', signal=BOB_SIGNAL, sender=BOB_SENDER)
self.loop.call_later(3, poke_alice)
self.loop.call_later(3, poke_alice)
self.loop.run_forever()
因此,当消息从 Bob 到达 Alice 时,信号处理程序实际上并不执行处理消息的工作,而只是安排执行该工作的实际处理程序的执行。实际的处理程序将在 Alice 的线程中被调用。
在这种情况下:
- 信号最初总是近乎即时地处理,从不阻塞发送者的线程。
- Alice 的代码总是在Alice 的线程中执行。 Bob 的代码总是在 Bob 的线程中执行。
所以我的目标实现了。
你们认为这是一个好的解决方案吗?很想听听对此的评论。
关于python - 在线程之间使用 pydispatch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47481909/