ZMQ 初学者。我实现了一个带有代理的 PUB-SUB 网络,想法是将多个 websocket 作为 PUB 插入到我的代理中,插入聚合提要(SUB)。我使用了 inproc 协议(protocol),因为它都发生在同一个进程中。
我写了下面的。我获得了 400 到 500 FPS。这太慢了。
import time
import random
import threading
import zmq
# Channels from SENDERS (PUB) to PROXY
channels = ["inproc://first", "inproc://second"]
# Channel from PROXY to RECEIVER
outbound = "inproc://to_aggregate"
numrec = 0
ctx = zmq.Context.instance()
lock = threading.Lock()
def sender(context, address):
socket = context.socket(zmq.PUB)
socket.connect(address)
while True:
twait = random.randint(1,3)
tosend = f'{twait} from {threading.current_thread()}'.encode()
socket.send(tosend)
def receiver(context):
global numrec
socket = context.socket(zmq.SUB)
socket.connect(outbound)
topicfilter = '' # As string, encoded to bytes later on
socket.setsockopt(zmq.SUBSCRIBE, topicfilter.encode())
while True:
resp = socket.recv()
with lock:
# TOOK OUT A PRINT STATEMENT, WAS SLOWING DOWN THE LOOP
numrec += 1
def middleman(context):
data_in = context.socket(zmq.XSUB)
[data_in.bind(channel) for channel in channels]
data_out = context.socket(zmq.XPUB)
data_out.bind(outbound)
zmq.proxy(data_in, data_out)
exchpub = threading.Thread(target=sender, name='THE_TOPLEVEL_PUBLISHER', args=(ctx, channels[0]), daemon=True)
exchpub2 = threading.Thread(target=sender, name='THE_TOPLEVEL_PUBLISHER', args=(ctx, channels[1]), daemon=True)
exchsub = threading.Thread(target=receiver, name='THE_TOPLEVEL_SUB', args=(ctx,), daemon=True)
proxy = threading.Thread(target=middleman, name='THE_PROXY', args=(ctx,), daemon=True)
threadlist = [exchpub, exchpub2, exchsub, proxy]
[i.start() for i in threadlist]
secwait = 5
t=tzero=time.time()
while t-tzero < secwait:
t = time.time()
with lock:
print(f'exited here and {numrec/secwait} FPS')
这是我的主要问题:
为什么这么慢?
跟进问题:
1) ZMQ 文档说明:“INPROC:服务器必须在任何客户端发出连接之前发出绑定(bind)。”然而,无论初始化顺序如何,都没有观察到失败。为什么?
2) 使用 send_multipart 和 recv_multipart 减慢我的代码。 (发送一个带有 len == 2 的迭代会减慢大约一半)因此,我为什么要使用它?鉴于速度差异不大,我很乐意按如下方式使用它:(SOURCE, PAYLOAD, TIMESTAMP)。
3)您将如何分析一个这样的代码的速度?对实现的建议?
谢谢。
最佳答案
您正在单个进程中运行所有线程。
由于代码的原因,这很可能会导致性能下降。
这是由 Python 的 GIL(全局解释器锁)引起的:
(...) the GIL allows only one thread to execute at a time, even in a multi-threaded architecture with more than one CPU core (...)
见:https://medium.com/python-features/pythons-gil-a-hurdle-to-multithreaded-program-d04ad9c1a63 (以及许多其他来源)。
对于您的程序,这意味着
sender
和 receiver
循环花费大量时间等待 GIL。要避免这种情况,请使用 Python 的多进程库,如下所述:https://timber.io/blog/multiprocessing-vs-multithreading-in-python-what-you-need-to-know/
关于python-3.x - 带有代理 : How to improve number of frames per second 的 PUB-SUB 网络,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54627021/