python - Python UDP套接字发送瓶颈(缓慢/延迟随机)

标签 python sockets gil

带有发送中的hickup的Python UDP Streamer

我目前正在开发python 3.4网络流应用程序。而且我的套接字有一些疯狂的行为。 (如果可能,则与目标3.3兼容)
定义:当我谈论Stream时,是指UDP-Stream。

The problem

While sending the socket.send operation sometimes start take 1-3ms, as i will describe more below the transfer target is much higher. I found other threads here telling about problems with speed, but they handled to send 200k packages a second, but they only send "A". In my case each packet is 1500 Bytes inc. UDP and IP header added by socket. Please see my explains below if the problem not is clear at this point.

Question

Does anyone have an idea why this delays? Or how to speed up sending to reach real time?



我的测试代码如下所示:

def _transfer(self):
    self.total_num_samps_sent = 0
    self.sequence_out = 0
    self.send_in_progress = True
    send = self.udp_socket.send
    for i in range(0, len(streams), 1):
        stream_data, stream_samps, stream_seq = self.packed_streams[i]
        # commit the samples
        start_try_send_time = monotonic()
        while not self.ready():
            if monotonic() - start_try_send_time > self.timeout > 0:
                # timeout; if timeout == 0 wait endless
                return 0
        self.sequence_out = stream_seq
        # ######################
        # Here is the bottleneck
        # ######################
        s = monotonic()
        send(stream_data)
        e = monotonic()
        if e-s > 0:
            print(str(i) + ': ' + str(e-s))
        # #####################
        # end measure monotonic
        # #####################
    self.total_num_samps_sent += stream_samps
    self.send_in_progress = False

self.packed_streams contains a list of tuples (data_in_bytes(), number_samples_in_this_stream, sequence_out) the function self.ready() returns True when the targed ACK'ed enough packets send (has free RAM).



特殊标记的瓶颈有更详细的描述:请多一点了解

套接字的创建如下所示:

self.target = (str(self.ip_target), port)
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.settimeout(self.socket_timeout)
try:
    self.udp_socket.bind((str(self.ip_own), 0))
except OSError as os_error:
    error = ('OS Error: {0}'.format(os_error)
             + linesep + 'IP src: ' + str(self.ip_own)
             + linesep + 'IP dst: ' + str(self.ip_usrp)
             + linesep + 'Port: {0}'.format(port))
    exit(error)
self.udp_socket.connect(self.target)
# not helps to set to non blocking
# self.udp_socket.setblocking(False) 

sendfunction(第一个代码块)作为单独的线程运行。
UDPFlowControl也产生另一个线程。与发送流媒体在同一套接字上运行(流媒体继承FlowControl并使用其就绪状态)

UDP流量控制

def _worker(self):
    """
    * Receive Loop
    * - update flow control condition count
    * - put async message packets into queue
    """
    self.send_here_am_i()
    while 1:
        ready = select([self.udp_socket], [], [], self.socket_timeout)
        if ready[0]:
            try:
                data_in = self.udp_socket.recv(2048)
            except:
                # ignore timeout/error buffers
                continue
            # with suppress(Exception):  #ToDo Reenable after test is done
            bytes_in = len(data_in)
            self.data_received += bytes_in
            # extract the vrt header packet info
            vrt = VRTImplementation()
            vrt.num_packet_words32 = int(bytes_in / ctypes.sizeof(ctypes.c_uint32))
            if not vrt.unpack_header(data_in, VRTEndian.BIG_ENDIAN):
                continue
            # handle a tx async report message
            if vrt.stream_id32 == Defaults.ASYNC_SID and vrt.packet_type != PacketType.DATA:
                # fill in the async metadata
                metadata = MetadataAsync()
                metadata.load_from_vrt(vrt, data_in[vrt.num_header_words32 * 4:],
                                       self.tick_rate)
                # catch the flow control packets and react
                if metadata.event_code == EventCode.FLOW_CONTROL:
                    self.sequence_in = \
                        unpack('>I', data_in[vrt.num_header_words32 * 4 + 4:vrt.num_header_words32 * 4 + 8])[0]
                    continue
                self.async_msg_fifo.append(metadata)
            else:
                # TODO: unknown packet
                pass

def ready(self):
    """
    Check if less ack are outstanding than max allowed
    :returns bool: if device can get more data
    """
    return self.sequence_out - self.sequence_in < self.max_sequence_out

个人资料

<<删除了旧基准>>如果需要此信息,请再次查看历史!

如上所述,单调分析是我提出问题的原因。如您所见,时间为0会被忽略。输出看起来像这样:(该流包含5秒的数据(发送2754,8个字节流),其结果大小(wireshark)每个为1500字节
Send:  445.40K of    5.00M, Sending:  True @ monotonic time:   44927.0550
1227: 0.01599999999598367
1499: 0.01599999999598367
1740: 0.014999999999417923
1883: 0.01600000000325963
Send:  724.18K of    5.00M, Sending:  True @ monotonic time:   44927.3200
....

第一个数字是延迟打包的索引。第二个数字是此延迟的差分时间单调。未在此处显示,但在我的日志中,我发现了一些计时,例如7582:0.030999999995401595,有时在0.06时会高得多...

以Send开头的行是将当前状态写入控制台的Main Thread。写入后进入休眠状态250ms。

我的问题是,当前系统仅以目标速度的1/25运行,并且已经启动了此hickup,如您在cProfile中所看到的,这需要近30秒才能发送5秒的流。在每个1500Bytes处,目标速度将为68870P/s,在GbE => 125MByte/s的限制下,包含开销的〜98,5MByte。

这是单个目标应用程序。通常情况下,它无需任何路由器,交换机就可以通过网络直接连接到设备。因此,网络仅属于此应用程序和设备。

到目前为止,我所做的是:
  • 正如您在代码中看到的那样,我将测试最小化到最低限度,流已经在内存中,可以转移到设备,而无需进行更多转换,只需放入套接字即可。
  • 经过测试,可以选择是否已准备好发送套接字,是否单调启动,将数据放入套接字,停止单调并查看结果。
  • 使用wireshark检查网络(在wireshark中有13774个发送调用13774,我数了〜1310个hickups)
  • 认为GIL是原因,但很难弄清楚。
  • 测试时打开防火墙-不变
  • [编辑1]如果套接字可以以目标速度执行,则带有Boost的C++中的Testet,这里也有序列号,但是它们要短100-1000µs(此设备中的1MB缓冲区可以处理)

  • 请记住,在所有测试中,仅可以调试print命令。单调调用的一半也用于调试目的。

    <<删除了旧基准>>如果需要此信息,请再次查看历史!

    在Windows 7 x64和Python 3.4.2上运行。 @ Corei7 2630QM和8GB RAM

    <<删除了旧基准>>如果需要此信息,请再次查看历史!

    编辑3

    首先,因为我可以快速回答cProfile在线程中运行,所以_worker仍然是未剖析的第二线程,因为等待准备工作所用的时间很短(总计〜0.05),我猜想它运行得足够快。 _send函数是线程入口,更多的是包装器,它可以cProfile此线程。
    def _send(self):
        profile = cProfile.Profile()
        profile.enable()
        self._transfer()
        profile.disable()
        profile.print_stats()
    

    禁用超时并重新运行分析需要等待1或2天,我目前正在清理代码,因为仍然有线程处于后台处于挂起状态( sleep 250毫秒),我认为让它们死掉并在使用时重生不是问题。完成此操作后,我将重试测试。我认为关于GIL的更多内容是这里的弊端。可能是在流程控制中打开传入数据包的过程以及在线程之间进行切换的过程,这可能会花费一些时间并导致此问题。 (如果我理解GIL正确-只有一个线程可以一次执行python代码,但是我想知道为什么这总是会触发套接字操作,而不是像40/60-50/50那样更平等地分配ready和send调用)因此,在我的待办事项 list 上有一个 future 包,可以在Processes上真正使用多核。为了测试这一点,我将将ready永久返回的值设置为True,并且将FlowControl Thread设置为不在第一条命令中启动或返回。

    该程序的目标是在Linux,Windows,Mac和Unix上运行。

    编辑4

    首先关于线程-此处没有提到它们的优先级:Controlling scheduling priority of python threads?
    我相信没有办法改变它。运行Python的核心最大速度为25%。调试器运行时,整个系统的负载约为10%。

    选择运行只是一个测试。我在发送例程中删除了选择代码,并在有和没有超时的情况下进行了测试:

    <<删除了旧基准>>如果需要此信息,请再次查看历史!

    线程清除的旧代码示例

    在此示例中,我杀死了所有线程,而不是让它们进入休眠状态。而且主线程会睡更多的时间。
    没有FlowControl @ 5M
             41331 function calls in 2.935 seconds
    
    Ordered by: standard name
    
    ncalls  tottime  percall  cumtime  percall filename:lineno(function)
         1    2.007    2.007    2.935    2.935 SendStreamer.py:297(_transfer)
     13776    0.005    0.000    0.005    0.000 UDPFlowControl.py:52(ready)
         1    0.000    0.000    0.000    0.000 {built-in method len}
     13776    0.007    0.000    0.007    0.000 {built-in method monotonic}
         1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
     13776    0.915    0.000    0.915    0.000 {method 'send' of '_socket.socket' objects}
    

    使用FlowControl @ 5M

    在这里等待设备要比在发送上花费更多的时间。
                68873 function calls in 5.245 seconds
    
    Ordered by: standard name
    
    ncalls  tottime  percall  cumtime  percall filename:lineno(function)
         1    4.210    4.210    5.245    5.245 SendStreamer.py:297(_transfer)
     27547    0.030    0.000    0.030    0.000 UDPFlowControl.py:52(ready)
         1    0.000    0.000    0.000    0.000 {built-in method len}
     27547    0.011    0.000    0.011    0.000 {built-in method monotonic}
         1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
     13776    0.993    0.000    0.993    0.000 {method 'send' of '_socket.socket' objects}
    

    仍然开放:分解为多个流程。 -仍根据流程使用情况重构类结构(我想可能要在最后一刻添加一些新结果)。在进行一些更详细的基准测试时,我发现第二个线程(VRT的解压缩)几乎花费了每次启动的时间。对于流程,这不再是造成速度下降的可能原因。

    我希望有所有必要的信息,如果我忘记了一些请询问!

    [Edit1]在“我已完成的工作”列表中添加了信息

    [Edit2]添加了第二个测试系统(Manjaro)的cProfiles

    [Edit3]添加了有关cProfile运行方式的信息。

    [Edit4]更多cProfiles +有关线程的答案

    [Edit5]删除了旧基准

    最佳答案

    我可以在Linux上以无特权用户python2的身份确认这一点。

    我认为您无能为力:

    # timing code:
    In [16]: @contextlib.contextmanager
       ....: def timeit():
       ....:     st = time.time()
       ....:     yield
       ....:     en = time.time()
       ....:     b = int(math.log10(en - st))
       ....:     data.setdefault(b, 0)
       ....:     data[b] += 1
    
    # Thus, timing data means:
    -6: number of times send took between 0.00000011 and 0.000001s
    -4: 0.0000011 ~ 0.00001
    -4: 0.000011 ~ 0.0001
    -3: 0.00011 ~ 0.001 (up to millisecond)
    -2: 0.0011 ~ 0.01 (1..10ms)
    
    # Regular blocking socket
    {-6: 2807, -5: 992126, -4: 5049, -3: 18}
    # Non-blocking socket
    {-6: 3242, -5: 991767, -4: 4970, -3: 20, -2: 1}
    # socket with timeout=0
    {-6: 2249, -5: 992994, -4: 4749, -3: 8}
    # socket with timeout=1
    {-5: 994259, -4: 5727, -3: 8, -2: 6}
    

    看起来这种分布的尾部是指数的。

    我还增大了发送缓冲区,并偶尔添加了time.sleep来给内核时间来发送排队的数据包,但这没有帮助。这是有道理的,因为非阻塞也会偶尔出现缓慢的发送。

    我还尝试按照http://www.pycopia.net/_modules/pycopia/socket.html outq函数显式等待发送队列为空,这也没有改变分发。

    关于python - Python UDP套接字发送瓶颈(缓慢/延迟随机),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28846777/

    相关文章:

    python - IDLE shell 无法找到 cx_Oracle 的镜像(原因 : image not found)

    python - 属性错误: 'module' object has no attribute 'openSPI'

    node.js - WebSocket 握手响应 400,但仍然有效

    python - 多处理对 urllib2 没用?

    python - 两段代码(关于 python GIL)有什么区别?

    python - 有谁知道验证 MSISDN 格式手机号码的正则表达式?

    python - "Python: Current File (Integrated Terminal)"-Visual Studio 代码

    c# - Java 套接字断开连接报告与 C# 断开连接

    c - 如何查询服务器并获取 MX、A、NS 记录

    python - Python中的绿色线程和线程