python-3.x - 为什么“multiprocessing.Queue.get”这么慢?

标签 python-3.x message-queue python-multiprocessing

我需要帮助来理解multiprocessing.Queue。我面临的问题是,与调用queue.get(...)和队列的缓冲区(双端队列)相比,从queue.put(...)获取结果的时间荒谬。

这种泄漏的抽象使我研究了队列的内部。它简单易懂的source code只是将我指向deque implementation,这似乎也很简单,以至于我无法用它来解释我所看到的行为。我还阅读了Queue使用管道的信息,但是我似乎在源代码中找不到它。

我将其简化为一个重现该问题的最小示例,并在其下方指定了可能的输出。

import threading
import multiprocessing
import queue

q = None
def enqueue(item):
    global q
    if q is None:
        q = multiprocessing.Queue()
        process = threading.Thread(target=worker, args=(q,))  # or multiprocessing.Process Doesn't matter
        process.start()
    q.put(item)
    print(f'len putted item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')


def worker(local_queue):
    while True:
        try:
            while True:  # get all items
                item = local_queue.get(block=False)
                print(f'len got item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
        except queue.Empty:
            print('empty')


if __name__ == '__main__':
    for i in range(1, 100000, 1000):
        enqueue(list(range(i)))


输出:

empty
empty
empty
len putted item: 1. qsize: 1. buffer len: 1
len putted item: 1001. qsize: 2. buffer len: 2
len putted item: 2001. qsize: 3. buffer len: 1
len putted item: 3001. qsize: 4. buffer len: 2
len putted item: 4001. qsize: 5. buffer len: 3
len putted item: 5001. qsize: 6. buffer len: 4
len putted item: 6001. qsize: 7. buffer len: 5
len putted item: 7001. qsize: 8. buffer len: 6
len putted item: 8001. qsize: 9. buffer len: 7
len putted item: 9001. qsize: 10. buffer len: 8
len putted item: 10001. qsize: 11. buffer len: 9
len putted item: 11001. qsize: 12. buffer len: 10
len putted item: 12001. qsize: 13. buffer len: 11
len putted item: 13001. qsize: 14. buffer len: 12
len putted item: 14001. qsize: 15. buffer len: 13
len putted item: 15001. qsize: 16. buffer len: 14
len got item: 1. qsize: 15. buffer len: 14
len putted item: 16001. qsize: 16. buffer len: 15
len putted item: 17001. qsize: 17. buffer len: 16
len putted item: 18001. qsize: 18. buffer len: 17
len putted item: 19001. qsize: 19. buffer len: 18
len putted item: 20001. qsize: 20. buffer len: 19
len putted item: 21001. qsize: 21. buffer len: 20
len putted item: 22001. qsize: 22. buffer len: 21
len putted item: 23001. qsize: 23. buffer len: 22
len putted item: 24001. qsize: 24. buffer len: 23
len putted item: 25001. qsize: 25. buffer len: 24
len putted item: 26001. qsize: 26. buffer len: 25
len putted item: 27001. qsize: 27. buffer len: 26
len putted item: 28001. qsize: 28. buffer len: 27
len got item: 1001. qsize: 27. buffer len: 27
empty
len putted item: 29001. qsize: 28. buffer len: 28
empty
empty
empty
len got item: 2001. qsize: 27. buffer len: 27
empty
len putted item: 30001. qsize: 28. buffer len: 28


我希望您注意到以下有关结果的信息:插入元素28001之后,工作人员发现队列中没有剩余的元素,而还有数十个元素。由于同步,我可以只获取其中的少数几个。但是它只能设法找到两个!

而且这种模式还在继续。

这似乎与我放入队列中的对象的大小有关。对于较小的对象,例如说i而不是list(range(i)),则不会出现此问题。但是正在谈论的对象的大小仍然是千字节,不足以引起严重的延迟(在我的非最小示例中,这很容易花了几分钟)

我的问题特别是:如何在Python的各个进程之间共享(而不是如此)大量数据?
另外,我想知道Queue的内部实现在哪里发生?

最佳答案

我也遇到了这个问题。我正在发送大型的numpy数组(约300MB),而在mp.queue.get()上是如此慢。
在仔细研究mp.Queue的python2.7源代码之后,我发现(在类Unix系统上)最慢的部分是socket_connection.c中的_conn_recvall(),但我没有更深入地了解。
要解决此问题,我构建了一个实验软件包FMQ

该项目的灵感来自于multiprocessing.Queue(mp.Queue)的使用。由于管道的速度限制(在类似Unix的系统上),mp.Queue对于大型数据项比较慢。
借助mp.Queue处理进程间传输,FMQ实现了一个窃听器线程,一旦有可用项,该线程就会从mp.Queue中窃取一个项,并将其放入Queue.Queue。然后,使用者进程可以立即从Queue.Queue中获取数据。
提速基于以下假设:生产者和消费者过程都需要大量计算(因此需要多处理),并且数据量很大(例如> 50 227x227图像)。否则,带有多重处理的mp.Queue或带有线程的Queue.Queue就足够了。

fmq.Queue像mp.Queue一样容易使用。
请注意,由于该项目尚处于初期阶段,因此仍有一些Known Issues

关于python-3.x - 为什么“multiprocessing.Queue.get”这么慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47085458/

相关文章:

python - 多处理池不适用于嵌套函数

python - 在 Pygame 中绘制半圆

python - 类型错误 : 'Food' object does not support indexing

python - 如何在 Python 3 中打印异常?

redis - 将数据从 Redis 从站传播到 SQL 数据库

c# - 排队 ObservableCollection 更新

python - Dask 因 freeze_support 错误而失败

带有 "Position only parameter"的 Python 函数

python - ZeroMQ worker 应该如何安全地 "hang up"?

python - 为什么我的 Python 应用程序停滞在 'system'/内核 CPU 时间