python - 在线程之间使用 pydispatch

标签 python multithreading python-3.x events

我在使用 pydispatch 模块进行线程间通信时遇到了问题。我使用了此处提供的示例:https://sites.google.com/site/hardwaremonkey/blog/python-howtocommunicatebetweenthreadsusingpydispatch

我稍微修改了它以在日志中提供更详细的信息。特别是,我还让它显示了实际的线程名称:

from pydispatch import dispatcher
import threading
import time
import logging

log_formatter = logging.Formatter('%(asctime)s %(levelname)s [%(name)s] [%(threadName)s] %(message)s', '%H:%M:%S')
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
log_handler = logging.StreamHandler()
log_handler.setLevel(logging.DEBUG)
log_handler.setFormatter(log_formatter)
logger.addHandler(log_handler)


ALICE_SIGNAL='alice_signal'
ALICE_SENDER='alice_sender'
BOB_SIGNAL='bob_signal'
BOB_SENDER='bob_sender'

class Alice():
    ''' alice procrastinates and replies to bob'''
    def __init__(self):
        logger.debug('Alice instantiated')
        dispatcher.connect(self.alice_dispatcher_receive, signal=BOB_SIGNAL, sender=BOB_SENDER)
        self.alice()

    def alice_dispatcher_receive(self, message):
        ''' handle dispatcher'''
        logger.debug('Alice has received message: {}'.format(message))
        dispatcher.send(message='thank you from Alice', signal=ALICE_SIGNAL, sender=ALICE_SENDER)

    def alice(self):
        ''' loop and wait '''
        while True:
            logger.debug('Alice is procrastinating')
            time.sleep(1)  

class Bob():
    ''' bob contacts alice periodically '''
    def __init__(self):
        logger.debug('Bob instantiated')
        dispatcher.connect(self.bob_dispatcher_receive, signal=ALICE_SIGNAL, sender=ALICE_SENDER)
        self.bob()

    def bob_dispatcher_receive(self, message):
        ''' handle dispatcher '''
        logger.debug('Bob has received message: {}'.format(message))

    def bob(self):
        ''' loop and send messages using a dispatcher '''
        while True:
            dispatcher.send(message='message from Bob', signal=BOB_SIGNAL, sender=BOB_SENDER)
            time.sleep(3)


if __name__ == '__main__':
    logger.debug('Starting...')
    alice_thread = threading.Thread(target=Alice, name='Thread-Alice')
    alice_thread.start()
    bob_thread = threading.Thread(target=Bob, name='Thread-Bob')
    bob_thread.start()

这是我发现的:

08:10:43 DEBUG [root] [MainThread] Starting...
08:10:43 DEBUG [root] [Thread-Alice] Alice instantiated
08:10:43 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:43 DEBUG [root] [Thread-Bob] Bob instantiated
08:10:43 DEBUG [root] [Thread-Bob] Alice has received message: message from Bob
08:10:43 DEBUG [root] [Thread-Bob] Bob has received message: thank you from Alice
08:10:44 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:45 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:46 DEBUG [root] [Thread-Bob] Alice has received message: message from Bob
08:10:46 DEBUG [root] [Thread-Bob] Bob has received message: thank you from Alice
08:10:46 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:47 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:48 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:49 DEBUG [root] [Thread-Bob] Alice has received message: message from Bob
08:10:49 DEBUG [root] [Thread-Bob] Bob has received message: thank you from Alice
08:10:49 DEBUG [root] [Thread-Alice] Alice is procrastinating

看这个:

[Thread-Bob] Alice has received message: message from Bob

“Alice has received message”已经在Bob的线程中执行。虽然我预计它会在 Alice 的线程中执行。据我了解,调度程序收到来自 Bob 的信号并直接在同一个线程中调用处理程序。所以它实际上从 Bob 的线程调用了 Alice 的代码,导致意想不到的细微差别。

问题#1。当 Alice 处理信号时,Bob 的执行实际上被阻止了。

问题#2。在较大的应用程序中,Alice 的代码可能会意外地在多个并行线程中执行。

问题#3。一般封装性差。我们希望 Alice 和 Bob 在各自的线程中运行在一个实例中,彼此独立,只交换消息。这里不是这种情况,因为它们实际上调用了彼此的代码。

有没有办法为 pydispatcher 解决这个问题?或者您可以推荐另一个没有这些问题的线程间通信库吗?

最佳答案

找到了使用 event_loop.call_soon_threadsafe() 的解决方案。

现在是代码:

def register_signal_handler(loop, handler, signal, sender):
    def dispatcher_receive(message):
        loop.call_soon_threadsafe(handler, message)
    dispatcher.connect(dispatcher_receive, signal=signal, sender=sender, weak=False)


class Alice():
    def __init__(self):
        logger.debug('Alice instantiated')
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        register_signal_handler(self.loop, self.alice_receive, signal=BOB_SIGNAL, sender=BOB_SENDER)
        self.alice()

    def alice_receive(self, message):
        logger.debug('Alice has received message: {}'.format(message))
        dispatcher.send(message='thank you from Alice', signal=ALICE_SIGNAL, sender=ALICE_SENDER)

    def alice(self):
        def procrastinate():
            logger.debug('Alice is procrastinating')
            self.loop.call_later(1, procrastinate)
        procrastinate()
        self.loop.run_forever()

class Bob():
    def __init__(self):
        logger.debug('Bob instantiated')
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        register_signal_handler(self.loop, self.bob_receive, signal=ALICE_SIGNAL, sender=ALICE_SENDER)
        self.bob()

    def bob_receive(self, message):
        logger.debug('Bob has received message: {}'.format(message))

    def bob(self):
        def poke_alice():
            dispatcher.send(message='message from Bob', signal=BOB_SIGNAL, sender=BOB_SENDER)
            self.loop.call_later(3, poke_alice)
        self.loop.call_later(3, poke_alice)
        self.loop.run_forever()

因此,当消息从 Bob 到达 Alice 时,信号处理程序实际上并不执行处理消息的工作,而只是安排执行该工作的实际处理程序的执行。实际的处理程序将在 Alice 的线程中被调用。

在这种情况下:

  1. 信号最初总是近乎即时地处理,从不阻塞发送者的线程。
  2. Alice 的代码总是在Alice 的线程中执行。 Bob 的代码总是在 Bob 的线程中执行。

所以我的目标实现了。

你们认为这是一个好的解决方案吗?很想听听对此的评论。

关于python - 在线程之间使用 pydispatch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47481909/

相关文章:

python - 无法检查数字是否完美

python - 创建 Pony ORM 实体而不获取相关对象?

java - 通过 Java ProcessBuilder 激活 virtualenv

ruby - 如何从另一个线程捕获异常

python - Python如何设置安装目录?

python-2.7 - wget 和 urllib 无法从 python 3 上的 url 加载文件

python - OpenCV Python : How to save name of the recognized face from face recognition program after the face is recognised?

Python 3.x 读取带标题的文件(已识别标题之间的数据)

javascript - Django 模板中的 List.pop() 没有 for 循环

python - SQLAlchemy未将Python池线程识别为单独的进程