带有发送中的hickup的Python UDP Streamer
我目前正在开发python 3.4网络流应用程序。而且我的套接字有一些疯狂的行为。 (如果可能,则与目标3.3兼容)
定义:当我谈论Stream时,是指UDP-Stream。
The problem
While sending the socket.send operation sometimes start take 1-3ms, as i will describe more below the transfer target is much higher. I found other threads here telling about problems with speed, but they handled to send 200k packages a second, but they only send "A". In my case each packet is 1500 Bytes inc. UDP and IP header added by socket. Please see my explains below if the problem not is clear at this point.
Question
Does anyone have an idea why this delays? Or how to speed up sending to reach real time?
我的测试代码如下所示:
def _transfer(self):
self.total_num_samps_sent = 0
self.sequence_out = 0
self.send_in_progress = True
send = self.udp_socket.send
for i in range(0, len(streams), 1):
stream_data, stream_samps, stream_seq = self.packed_streams[i]
# commit the samples
start_try_send_time = monotonic()
while not self.ready():
if monotonic() - start_try_send_time > self.timeout > 0:
# timeout; if timeout == 0 wait endless
return 0
self.sequence_out = stream_seq
# ######################
# Here is the bottleneck
# ######################
s = monotonic()
send(stream_data)
e = monotonic()
if e-s > 0:
print(str(i) + ': ' + str(e-s))
# #####################
# end measure monotonic
# #####################
self.total_num_samps_sent += stream_samps
self.send_in_progress = False
self.packed_streams contains a list of tuples (data_in_bytes(), number_samples_in_this_stream, sequence_out) the function self.ready() returns True when the targed ACK'ed enough packets send (has free RAM).
特殊标记的瓶颈有更详细的描述:请多一点了解
套接字的创建如下所示:
self.target = (str(self.ip_target), port)
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.settimeout(self.socket_timeout)
try:
self.udp_socket.bind((str(self.ip_own), 0))
except OSError as os_error:
error = ('OS Error: {0}'.format(os_error)
+ linesep + 'IP src: ' + str(self.ip_own)
+ linesep + 'IP dst: ' + str(self.ip_usrp)
+ linesep + 'Port: {0}'.format(port))
exit(error)
self.udp_socket.connect(self.target)
# not helps to set to non blocking
# self.udp_socket.setblocking(False)
sendfunction(第一个代码块)作为单独的线程运行。
UDPFlowControl也产生另一个线程。与发送流媒体在同一套接字上运行(流媒体继承FlowControl并使用其就绪状态)
UDP流量控制
def _worker(self):
"""
* Receive Loop
* - update flow control condition count
* - put async message packets into queue
"""
self.send_here_am_i()
while 1:
ready = select([self.udp_socket], [], [], self.socket_timeout)
if ready[0]:
try:
data_in = self.udp_socket.recv(2048)
except:
# ignore timeout/error buffers
continue
# with suppress(Exception): #ToDo Reenable after test is done
bytes_in = len(data_in)
self.data_received += bytes_in
# extract the vrt header packet info
vrt = VRTImplementation()
vrt.num_packet_words32 = int(bytes_in / ctypes.sizeof(ctypes.c_uint32))
if not vrt.unpack_header(data_in, VRTEndian.BIG_ENDIAN):
continue
# handle a tx async report message
if vrt.stream_id32 == Defaults.ASYNC_SID and vrt.packet_type != PacketType.DATA:
# fill in the async metadata
metadata = MetadataAsync()
metadata.load_from_vrt(vrt, data_in[vrt.num_header_words32 * 4:],
self.tick_rate)
# catch the flow control packets and react
if metadata.event_code == EventCode.FLOW_CONTROL:
self.sequence_in = \
unpack('>I', data_in[vrt.num_header_words32 * 4 + 4:vrt.num_header_words32 * 4 + 8])[0]
continue
self.async_msg_fifo.append(metadata)
else:
# TODO: unknown packet
pass
def ready(self):
"""
Check if less ack are outstanding than max allowed
:returns bool: if device can get more data
"""
return self.sequence_out - self.sequence_in < self.max_sequence_out
个人资料
<<删除了旧基准>>如果需要此信息,请再次查看历史!
如上所述,单调分析是我提出问题的原因。如您所见,时间为0会被忽略。输出看起来像这样:(该流包含5秒的数据(发送2754,8个字节流),其结果大小(wireshark)每个为1500字节
Send: 445.40K of 5.00M, Sending: True @ monotonic time: 44927.0550
1227: 0.01599999999598367
1499: 0.01599999999598367
1740: 0.014999999999417923
1883: 0.01600000000325963
Send: 724.18K of 5.00M, Sending: True @ monotonic time: 44927.3200
....
第一个数字是延迟打包的索引。第二个数字是此延迟的差分时间单调。未在此处显示,但在我的日志中,我发现了一些计时,例如7582:0.030999999995401595,有时在0.06时会高得多...
以Send开头的行是将当前状态写入控制台的Main Thread。写入后进入休眠状态250ms。
我的问题是,当前系统仅以目标速度的1/25运行,并且已经启动了此hickup,如您在cProfile中所看到的,这需要近30秒才能发送5秒的流。在每个1500Bytes处,目标速度将为68870P/s,在GbE => 125MByte/s的限制下,包含开销的〜98,5MByte。
这是单个目标应用程序。通常情况下,它无需任何路由器,交换机就可以通过网络直接连接到设备。因此,网络仅属于此应用程序和设备。
到目前为止,我所做的是:
请记住,在所有测试中,仅可以调试print命令。单调调用的一半也用于调试目的。
<<删除了旧基准>>如果需要此信息,请再次查看历史!
在Windows 7 x64和Python 3.4.2上运行。 @ Corei7 2630QM和8GB RAM
<<删除了旧基准>>如果需要此信息,请再次查看历史!
编辑3
首先,因为我可以快速回答cProfile在线程中运行,所以_worker仍然是未剖析的第二线程,因为等待准备工作所用的时间很短(总计〜0.05),我猜想它运行得足够快。 _send函数是线程入口,更多的是包装器,它可以cProfile此线程。
def _send(self):
profile = cProfile.Profile()
profile.enable()
self._transfer()
profile.disable()
profile.print_stats()
禁用超时并重新运行分析需要等待1或2天,我目前正在清理代码,因为仍然有线程处于后台处于挂起状态( sleep 250毫秒),我认为让它们死掉并在使用时重生不是问题。完成此操作后,我将重试测试。我认为关于GIL的更多内容是这里的弊端。可能是在流程控制中打开传入数据包的过程以及在线程之间进行切换的过程,这可能会花费一些时间并导致此问题。 (如果我理解GIL正确-只有一个线程可以一次执行python代码,但是我想知道为什么这总是会触发套接字操作,而不是像40/60-50/50那样更平等地分配ready和send调用)因此,在我的待办事项 list 上有一个 future 包,可以在Processes上真正使用多核。为了测试这一点,我将将ready永久返回的值设置为True,并且将FlowControl Thread设置为不在第一条命令中启动或返回。
该程序的目标是在Linux,Windows,Mac和Unix上运行。
编辑4
首先关于线程-此处没有提到它们的优先级:Controlling scheduling priority of python threads?
我相信没有办法改变它。运行Python的核心最大速度为25%。调试器运行时,整个系统的负载约为10%。
选择运行只是一个测试。我在发送例程中删除了选择代码,并在有和没有超时的情况下进行了测试:
<<删除了旧基准>>如果需要此信息,请再次查看历史!
线程清除的旧代码示例
在此示例中,我杀死了所有线程,而不是让它们进入休眠状态。而且主线程会睡更多的时间。
没有FlowControl @ 5M
41331 function calls in 2.935 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 2.007 2.007 2.935 2.935 SendStreamer.py:297(_transfer)
13776 0.005 0.000 0.005 0.000 UDPFlowControl.py:52(ready)
1 0.000 0.000 0.000 0.000 {built-in method len}
13776 0.007 0.000 0.007 0.000 {built-in method monotonic}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
13776 0.915 0.000 0.915 0.000 {method 'send' of '_socket.socket' objects}
使用FlowControl @ 5M
在这里等待设备要比在发送上花费更多的时间。
68873 function calls in 5.245 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 4.210 4.210 5.245 5.245 SendStreamer.py:297(_transfer)
27547 0.030 0.000 0.030 0.000 UDPFlowControl.py:52(ready)
1 0.000 0.000 0.000 0.000 {built-in method len}
27547 0.011 0.000 0.011 0.000 {built-in method monotonic}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
13776 0.993 0.000 0.993 0.000 {method 'send' of '_socket.socket' objects}
仍然开放:分解为多个流程。 -仍根据流程使用情况重构类结构(我想可能要在最后一刻添加一些新结果)。在进行一些更详细的基准测试时,我发现第二个线程(VRT的解压缩)几乎花费了每次启动的时间。对于流程,这不再是造成速度下降的可能原因。
我希望有所有必要的信息,如果我忘记了一些请询问!
[Edit1]在“我已完成的工作”列表中添加了信息
[Edit2]添加了第二个测试系统(Manjaro)的cProfiles
[Edit3]添加了有关cProfile运行方式的信息。
[Edit4]更多cProfiles +有关线程的答案
[Edit5]删除了旧基准
最佳答案
我可以在Linux上以无特权用户python2的身份确认这一点。
我认为您无能为力:
# timing code:
In [16]: @contextlib.contextmanager
....: def timeit():
....: st = time.time()
....: yield
....: en = time.time()
....: b = int(math.log10(en - st))
....: data.setdefault(b, 0)
....: data[b] += 1
# Thus, timing data means:
-6: number of times send took between 0.00000011 and 0.000001s
-4: 0.0000011 ~ 0.00001
-4: 0.000011 ~ 0.0001
-3: 0.00011 ~ 0.001 (up to millisecond)
-2: 0.0011 ~ 0.01 (1..10ms)
# Regular blocking socket
{-6: 2807, -5: 992126, -4: 5049, -3: 18}
# Non-blocking socket
{-6: 3242, -5: 991767, -4: 4970, -3: 20, -2: 1}
# socket with timeout=0
{-6: 2249, -5: 992994, -4: 4749, -3: 8}
# socket with timeout=1
{-5: 994259, -4: 5727, -3: 8, -2: 6}
看起来这种分布的尾部是指数的。
我还增大了发送缓冲区,并偶尔添加了
time.sleep
来给内核时间来发送排队的数据包,但这没有帮助。这是有道理的,因为非阻塞也会偶尔出现缓慢的发送。我还尝试按照http://www.pycopia.net/_modules/pycopia/socket.html
outq
函数显式等待发送队列为空,这也没有改变分发。
关于python - Python UDP套接字发送瓶颈(缓慢/延迟随机),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28846777/