python - multiprocessinq.Queue 作为 Queue.Queue 子级的属性

标签 python multithreading queue multiprocessing

我试图弄清楚以下模块正在做什么。

import Queue
import multiprocessing
import threading

class BufferedReadQueue(Queue.Queue):
    def __init__(self, lim=None):
        self.raw = multiprocessing.Queue(lim)
        self.__listener = threading.Thread(target=self.listen)
        self.__listener.setDaemon(True)
        self.__listener.start()
        Queue.Queue.__init__(self, lim)

    def listen(self):
        try:
            while True:
                self.put(self.raw.get())
        except:
            pass

    @property
    def buffered(self):
        return self.qsize()

它仅在调用代码中实例化一次,.raw 属性 multiprocessing.Queue 被发送到另一个类,该类似乎继承自 multiprocessing.Process.

因此,正如我所看到的,BufferedReadQueue 的一个属性被用作队列,但不是类(或其实例)本身。

如果 BufferedReadQueue 实际上没有被用作队列,那么它继承自 Queue.Queue 而不仅仅是 object 的原因是什么?

最佳答案

看起来像BufferedReadQueue旨在用作转换 multiprocessing.Queue 的读取端的方法进入正常Queue.Queue 。请注意 __init__ 中的这一点:

    self.__listener = threading.Thread(target=self.listen)
    self.__listener.setDaemon(True)
    self.__listener.start()

这会启动一个监听器线程,该线程不断尝试 get内部项目multiprocessing.Queue ,然后 put将所有这些项目发送至 self 。看起来用例是这样的:

def func(queue):
   queue.put('stuff')
   ...

buf_queue = BufferedReadQueue()
proc = multiprocessing.Process(target=func, args=(buf_queue.raw,))
proc.start()
out = buf_queue.get()  # Only get calls in the parent

现在,您为什么要这样做而不是仅使用 multiprocessing.Queue直接地?可能是因为multiprocessing.Queue有一些缺点Queue.Queue没有。例如qsize() ,这BufferedReadQueue使用,is not reliable with multiprocessing.Queue :

Return the approximate size of the queue. Because of multithreading/multiprocessing semantics, this number is not reliable.

Note that this may raise NotImplementedError on Unix platforms like Mac OS X where sem_getvalue() is not implemented.

也可以内省(introspection) Queue.Queue ,并在不弹出内容的情况下查看其内容。这对于 multiprocessing.Queue 来说是不可能的。 .

关于python - multiprocessinq.Queue 作为 Queue.Queue 子级的属性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26167797/

相关文章:

python - 如何快速比较两个文件?

c - 启用 for 循环的强制矢量化

c++ - 为什么从 `std::async` 阻塞返回的 future 的析构函数?

Python 与空闲进程的进程间通信

Laravel "queue:prune-batches"未定义

java - PriorityQueue 元素未排序

python - 使用 slider 更新均匀分布的网格中的值

python - 定义仅在矩形子区域中具有非零元素的二维矩阵

c++ - 如何编写函数和成员函数的包装器,在包装函数之前和之后执行一些代码?

c++ - 带有 std::thread 和 std::chrono 的基本计时器