python - 如何加快多处理队列的同时读写速度?

标签 python queue multiprocessing

tl;dr - 有没有办法提高同时读写多处理队列的速度?

我有一个处理审计数据的应用程序。将其视为系统日志中继。它接收数据,解析数据,然后继续发送事件。事件发生率可能很高 - 我的目标是每秒 15,000 个事件 (EPS)。

in_queue = multiprocessing.Queue()

out_queue = multiprocessing.Queue()

  • ReaderProc - 单个过程,套接字读取器,接收数据并将其放入 in_queue使用in_queue.put()
  • ParserProcs - 多个过程,使用 in_queue.get()获取数据,处理数据,然后将完成的结果放入 out_queue使用out_queue.put()
  • WriterProc - 单个过程,读取 out_queue使用out_queue.get()并通过 TCP 套接字连接发送数据

我使用队列运行测试 - 我可以将事件以 25,000 EPS 的速度放入队列中。当多个解析进程 (4) 在写入数据时将数据从队列中拉出时,就会出现速度减慢的情况。利率降至每股 10,000 以下。我猜测底层管道、锁等是造成延迟的原因。

我阅读了管道,看起来它只支持 2 个端点。我需要将 CPU 密集型解析 fork 到多个进程。多处理内存共享等替代方法能否取得更好的结果?我怎样才能更好地同步.put().get()来自队列的操作?

最佳答案

考虑到您的性能需求,我认为您最好使用第三方消息代理,例如 ZeroMQRabbitMQ为了这。我找到了一个比较多个的基准 here (尽管它与您的用例不太匹配)。性能差异巨大:

多处理。队列结果

1
2
3

python2 ./multiproc_with_queue.py
Duration: 164.182257891
Messages Per Second: 60907.9210414

0mq 结果

1
2
3

python2 ./multiproc_with_zeromq.py
Duration: 23.3490710258
Messages Per Second: 428282.563744

我进行了这两项测试,并提供了更复杂的工作负载,因为 multiprocessing.Queue 的好处之一是它可以为您处理序列化。这是新脚本:

mult_queue.py

import sys
import time
from  multiprocessing import Process, Queue

def worker(q):
    for task_nbr in range(1000000):
        message = q.get()
    sys.exit(1)

def main():
    send_q = Queue()
    Process(target=worker, args=(send_q,)).start()
    msg = {
            'something': "More",
            "another": "thing",
            "what?": range(200),
            "ok": ['asdf', 'asdf', 'asdf']
            }
    for num in range(1000000):
        send_q.put(msg)

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = 1000000 / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

multi_zmq.py

import sys
import zmq
from  multiprocessing import Process
import time
import json
import cPickle as pickle

def worker():
    context = zmq.Context()
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    for task_nbr in range(1000000):
        message = work_receiver.recv_pyobj()

    sys.exit(1)

def main():
    Process(target=worker, args=()).start()
    context = zmq.Context()
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")
    msg = {
            'something': "More",
            "another": "thing",
            "what?": range(200),
            "ok": ['asdf', 'asdf', 'asdf']
            }
    for num in range(1000000):
        ventilator_send.send_pyobj(msg)

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = 1000000 / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

输出:

dan@dan:~$ ./mult_zmq.py 
Duration: 14.0204648972
Messages Per Second: 71324.3110935
dan@dan:~$ ./mult_queue.py 
Duration: 27.2135331631
Messages Per Second: 36746.4229657

关于python - 如何加快多处理队列的同时读写速度?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23961669/

相关文章:

python - login_required 装饰器不起作用,flask-Login 允许匿名用户

c# - 使用线程池的正确方法是什么?

python - 如何使用远程管理器传递 python 对象?

php - 使用 popen 执行多处理脚本时输出困惑

python - 使用 vispy 以灰度显示图像

python - for 循环比 numpy 平均值快,结果也有点不同

laravel - 在 Laravel 中,如何控制 Job 是通过队列处理还是同步处理

Python 多处理 : processes do not start

python - 在 Tkinter.Tcl() 中使用 Python 函数

sql-server-2005 - 带有 Sql Server 2005 的服务代理 - 消息卡在队列中