python - Python多线程通信效率

标签 python multithreading performance queue python-multithreading

我是python多任务处理的新手。我用老式的方式做的:

我从threading.Thread继承并使用queue.Queue队列向主线程发送消息或从主线程发送消息。

这是我的基线程类:

class WorkerGenerico(threading.Thread):
    def __init__(self, task_id, input_q=None, output_q=None, keep_alive=300):
        super(WorkerGenerico, self).__init__()
        self._task_id = task_id
        if input_q is None:
            self._input_q = queue.Queue()
        else:
            if isinstance(input_q, queue.Queue):
                self._input_q = input_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        if output_q is None:
            self._output_q = queue.Queue()
        else:
            if isinstance(output_q, queue.Queue):
                self._output_q = output_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        if not isinstance(keep_alive, int):
            raise TypeError("El valor de keep_alive debe der un int.")
        self._keep_alive = keep_alive
        self.stoprequest = threading.Event()

    # def run(self):
    #    Implement a loop in subclases which checks if self.has_orden_parada() is true in order to stop.

    def join(self, timeout=None):
        self.stoprequest.set()
        super(WorkerGenerico, self).join(timeout)

    def gracefull_stop(self):
        self.stoprequest.set()

    def has_orden_parada(self):
        return self.stoprequest.is_set()

    def put(self,texto, block=True, timeout=None):
        return self._input_q.put(texto, block=block, timeout=timeout)

    def get(self, block=True, timeout=None):
        return self._output_q.get(block=block, timeout=timeout)


我的问题是,从外部调用WorkerGenerico.get()的成本很高,这是因为将队列队列存储在主线程中并使用Queue.get()。这两种方法在性能上相似,并且带有少量的非频繁控制消息,但是,我认为非常频繁的调用将使方法B值得使用。

我猜想模式A会消耗更多的资源(它必须以某种方式从外部线程调用该方法并将队列定义传回,我猜损失取决于Python的实现),但是最终代码更具可读性和直观性。

如果我不得不从其他语言的经验中分辨出来,我会说方法B更好,对吗?

方法A:

def main()
    worker = WorkerGenerico(task_id=1)
    worker.start()
    print(worker.get())


方法B:

def main()
    input_q = Queue()
    output_q = Queue()
    worker = WorkerGenerico(task_id=1, input_q=input_q, output_q=output_q)
    worker.start()
    print(output_q.get())


顺便说一句:为了完整起见,我想分享我现在的做法。它是两种方法的混合,为线程提供了良好的包络:

class EnvoltorioWorker:
    def __init__(self, task_id, input_q=None, output_q=None, keep_alive=300):
        if input_q is None:
            self._input_q = queue.Queue()
        else:
            if isinstance(input_q, queue.Queue):
                self._input_q = input_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        if output_q is None:
            self._output_q = queue.Queue()
        else:
            if isinstance(output_q, queue.Queue):
                self._output_q = output_q
            else:
                raise TypeError("input_q debe ser del tipo queue.Queue")
        self.worker = WorkerGenerico(task_id, input_q, output_q, keep_alive)

    def put(self, elem, block=True, timeout=None):
        return self._input_q.put(elem, block=block, timeout=timeout)

    def get(self, block=True, timeout=None):
        return self._output_q.get(block=block, timeout=timeout)


我使用EnvoltorioWorker.worker。*来调用联接或其他外部控制方法,并使用EnvoltorioWorker.get / EnvoltorioWorker.put与内部类正确通信,如下所示:

def main()
    worker_container = EnvoltorioWorker(task_id=1)
    worker_container.worker.start()
    print(worker_container.get())


通常,如果不需要其他对worker的访问,我还会在EnvoltorioWorker中为start(),join()和nonwait_stop()创建接口。

它可能看起来很虚拟,并且可能有更好的方法可以实现这一目标,因此:

哪种方法(A或B)是更好的做法?从Thread继承是处理Python中线程的正确方法吗?我将Dispycos用于分布式环境和类似的信封以与线程通信

编辑:刚注意到我忘了在类中翻译注释和一些字符串,但是它们足够简单,因此我认为它是可读的。有空的时候我会编辑它。

有什么想法吗?

最佳答案

您的队列未真正存储在线程中。假设这里使用CPython,所有对象都存储在堆中,并且线程仅具有私有堆栈。堆中的对象在同一进程中的所有线程之间共享。


  Python中的内存管理涉及一个包含所有Python对象和数据结构的私有堆。此私有堆的管理由Python内存管理器在内部确保。 Python内存管理器具有不同的组件,这些组件处理各种动态存储管理方面的问题,例如共享,分段,预分配或缓存。 docs


由此得出结论,您的对象(您的队列)位于何处不是问题,因为它始终在堆上。 Python中的变量(名称)只是对这些对象的引用。

影响运行时间的是通过嵌套函数/方法调用将多少个调用框架添加到堆栈中,以及需要多少字节码指令。那么这对时间安排有什么影响?



基准测试

考虑以下针对队列和工作程序的虚拟设置。为了简单起见,此处没有为虚拟工作人员提供线程,因为在我们假装只耗尽预先填充的队列的情况下,对其进行线程化不会影响时间。

class Queue:
    def get(self):
        return 1

class Worker:
    def __init__(self, queue):
        self.queue = queue
        self.quick_get = self.queue.get # a reference to a method as instance attribute

    def get(self):
        return self.queue.get()

    def quick_get_method(self):
        return self.quick_get()


可以看到,Worker有两种版本的get-method,get用一种定义方式,而quick_get_method是一种字节码指令,比我们稍后看到的要短。 worker实例不仅保存对queue实例的引用,而且还直接通过queue.get引用self.quick_get,这是我们保留一条指令的地方。

现在是在IPython会话中对来自伪队列的所有可能性进行基准测试的时间:

q = Queue()
w = Worker(q)

%timeit q.get()
285 ns ± 1.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
%timeit w.get()
609 ns ± 2.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
%timeit w.quick_get()
286 ns ± 0.756 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
%timeit w.quick_get_method()
555 ns ± 0.855 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


请注意,.get()q.get()之间的时序没有差异。
还请注意,与传统的w.quick_get()相比,w.quick_get_method()的计时有所改善。与w.get()Worker-method相比,使用get()在队列上调用q.get()的时间仍然几乎翻倍。这是为什么?

通过使用w.quick_get()模块,可以获得解释器正在处理的Python字节码指令的人类可读版本。

import dis

dis.dis(q.get)
  3           0 LOAD_CONST               1 (1)
              2 RETURN_VALUE

dis.dis(w.get)
  8           0 LOAD_FAST                0 (self)
              2 LOAD_ATTR                0 (queue)
              4 LOAD_METHOD              1 (get)
              6 CALL_METHOD              0
              8 RETURN_VALUE

dis.dis(w.quick_get)
  3           0 LOAD_CONST               1 (1)
              2 RETURN_VALUE

dis.dis(w.quick_get_method)
 11           0 LOAD_FAST                0 (self)
              2 LOAD_METHOD              0 (quick_get)
              4 CALL_METHOD              0
              6 RETURN_VALUE


请记住,这里的虚拟dis仅返回1。您会看到Queue.getq.get相同,这也反映在我们之前看到的计时中。请注意,w.quick_get直接加载w.quick_get_method,这只是对象quick_get所引用的另一个名称/变量。

您还可以借助queue.get模块获取打印出的纸叠深度:

def print_stack_depth(f):
    print(*[s for s in dis.code_info(f).split('\n') if
            s.startswith('Stack size:')]
    )

print_stack_depth(q.get)
Stack size:        1 
print_stack_depth(w.get)
Stack size:        2
print_stack_depth(w.quick_get)
Stack size:        1
print_stack_depth(w.quick_get_method)
Stack size:        2


不同方法之间的字节码和时序差异暗示(不足为奇),添加另一帧(通过添加另一种方法)将对性能造成最大的影响。



评论

上面的分析并不是不使用额外的Worker方法来调用引用对象(queue.get)上的方法的隐式请求。出于可读性考虑,记录日志和简化调试是正确的做法。例如,您还可以在Stdlib的dis中找到类似Worker.quick_get_method的优化,该优化也在内部使用队列。

从基准测试的角度来看,几百纳秒的时间并不多(对于Python)。在Python 3中,线程可以容纳GIL的默认最大时间间隔为5毫秒,因此可以一口气执行字节码。那是5 * 1000 * 1000纳秒。

无论如何,与开销多线程引入相比,几百纳秒也很小。我发现,例如,在一个线程中的multiprocessing.pool.Pool之后添加20 µs睡眠,而在另一个线程中仅从队列中读取,则平均导致每次迭代大约64.0μs的额外开销(不包括20μs的睡眠) )的范围为100k(Python 3.7.1,Ubuntu 18.04)。



设计

关于您对设计偏好的问题,我肯定会在这里选择方法A,而不是方法B。甚至在万一您的队列未跨多个线程使用的情况下,甚至更多。在您仅内部使用一个queue.put(integer)实例(而不是工作线程池)的情况下,IMO在最后一个代码段中混合创建的内容会不必要地使事情/理解变得不必要。与方法A相反,您的工作人员的“线程性”也深埋在另一个类的内部。

关于python - Python多线程通信效率,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53641407/

相关文章:

c++ - QThreads 的并发问题。接收相同信号的线程相互阻塞

performance - 用于处理历史记录的 ETL

java - 如何对一个java方法的内部实现进行基准测试?

java - 在库中实现同步和异步方法的正确方法是什么?

python - 使用 Python 从 HTML 中提取歌曲长度和大小

python - 循环数据帧时计算行的出现次数

multithreading - Haskell - 基于 Actor 的可变性

c# - 线程问题

python - 在不转换为列表的情况下随机采样 python 集合

python - 使用Python将CSV导入MySQL表