Python线程,Event和Queue如何协同工作?

标签 python multithreading concurrency queue python-multithreading

在看了比斯利书中的例子后,我正在和我的 friend 聊天

class ActorExit(Exception):
    pass

class Actor:
    def __init__(self):
        self._mailbox = Queue()

    def send(self, msg):
        self._mailbox.put(msg)

    def recv(self):
        msg = self._mailbox.get()
        if msg is ActorExit:
            raise ActorExit()
        return msg

    def close(self):
        self.send(ActorExit)

    def start(self):
        self._terminated = Event()
        t = Thread(target=self._bootstrap)
        t.daemon = True
        t.start()

    def _bootstrap(self):
        try:
            self.run()
        except ActorExit:
            pass
        finally:
            self._terminated.set()

    def join(self):
        self._terminated.wait()

    def run(self):
        while True:
            msg = self.recv()

class PrintActor(Actor):
    def run(self):
        while True:
            msg = self.recv()
            print('Got:', msg)

我的 friend 争辩说 Event 的唯一目的是阻塞主线程,直到另一个线程执行设置操作。 真的吗? 我们如何观察线程执行?

最佳答案

Python threads, how do Event and Queue work together?

他们没有。您可以使用没有队列的事件和没有事件的队列,彼此之间没有依赖关系。您的示例恰好同时使用了两者。

My friend argues that sole purpose of Event is to block the main thread until the other thread performs set operation. Is that true?

在事件对象上调用 .wait() 将阻塞任何调用线程,直到内部标志为 .set()

如果您查看 Event 的源代码,您会发现 Events 只包含一个带锁的 Condition 变量和一个 bool 标志 + 处理和传达(与等待线程)该标志的状态变化的方法。

class Event:

    """Class implementing event objects.
    Events manage a flag that can be set to true with the set() method and reset
    to false with the clear() method. The wait() method blocks until the flag is
    true.  The flag is initially false.
    """

    def __init__(self):
        self._cond = Condition(Lock())
        self._flag = False
    ...

How can we watch thread execution?

一个简单的方法是应用某种实用函数来打印出您感兴趣的内容,例如:

def print_info(info=""):
    """Print calling function's name and thread with optional info-text."""
    calling_func = sys._getframe(1).f_code.co_name
    thread_name = threading.current_thread().getName()
    print(f"<{thread_name}, {calling_func}> {info}", flush=True)

另一种可能性是使用类似 answer 中的日志记录.


不确定 Beazly 想要用您展示的代码演示什么,但对于这个简单的任务,我认为它有点过度设计。当您已经使用队列时,无需在顶部涉及事件。您可以通过传递标记值来初始化线程终止。

这是您的示例的简化版本,其中包含哨兵 ('STOP') 和来自上面的 print_info 的一些信息打印:

import sys
import time
import threading
from queue import Queue


class Actor(threading.Thread):

    def __init__(self):
        super().__init__(target=self.run)
        self.queue = Queue()

    def send(self, msg):
        self.queue.put(msg)
        print_info(f"sent: {msg}")  # DEBUG

    def close(self):
        print_info()  # DEBUG
        self.send('STOP')

    def run(self):
        for msg in iter(self.queue.get, 'STOP'):
            pass


class PrintActor(Actor):
    def run(self):
        for msg in iter(self.queue.get, 'STOP'):
            print_info(f"got: {msg}")  # DEBUG


if __name__ == '__main__':

    pa = PrintActor()
    pa.start()
    pa.send("Hello")
    time.sleep(2)
    pa.send("...World!")
    time.sleep(2)
    pa.close()
    pa.join()

输出:

<MainThread, send> sent: Hello
<Thread-1, run> got: Hello
<MainThread, send> sent: ...World!
<Thread-1, run> got: ...World!
<MainThread, close> 
<MainThread, send> sent: STOP

关于Python线程,Event和Queue如何协同工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52075515/

相关文章:

python regex unicode - 从 utf-8 文件中提取数据

python - multiprocessing.Pool 中的 up.close 和 p.join

java - java中延迟后调用特定函数

Java executorService.shutdownNow() 不取消或中断线程

Spring 的 WebUtils.getSessionMutex(javax.servlet.http.HttpSession) 和 HttpSessionMutexListener 仍然相关

python - 怎么强制去上课?

python - 如何使用 Python 将文本文件中的数据读取到数组中

Java : improve the smoothness of a slow thread animation?

.net - 有没有办法确定 .NET 线程何时终止?

c# - 没有多线程异步/等待的并发