c++ - 无法同时在两个 10Gbps 接口(interface)上达到全线速

标签 c++ linux sockets networking

我正在开发生成吞吐量为数十 Gbit/s 的实时数据流的应用程序。我不需要任何响应,所以我使用 UDP。我将数据包发送到多播地址。我的系统 (Centos 7) 有 2 个 10 Gbit/s 的网络端口。

当我尝试同时通过两个端口发送数据时遇到了麻烦。我预计会低于 20 Gbit/s,但实际上我得到了 11-12 Gbit。如果仅使用 1 个端口,我将获得 9.5 Gbit/s 的速度。

我使用 select() 和非阻塞套接字。这是可执行演示:

#include <string.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string>
#include <stdio.h>
#include <cerrno>
#include <cstring>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <malloc.h>
#include <ctime>
#include <sys/time.h>
#include <sys/fcntl.h>
// #include <sys/resource.h>



#define GROUP_PORT 3490
#define GROUP_ADDR "225.0.0.37"
#define INTERFACES 2
#define LOCAL_INTERFACE_IP0 "192.168.2.3"
#define LOCAL_INTERFACE_IP1 "192.168.2.4"



inline long duration_mcs(timespec t1, timespec t2) {
    return ((t2.tv_sec - t1.tv_sec)*1000000+(t2.tv_nsec - t1.tv_nsec)/1000);
}



int main(int argc, char* argv[])
{

    // if (setpriority(PRIO_PROCESS, 0, -15) == -1) {
    //     printf("PRIO failed: %s.\n", std::strerror(errno));
    //     return -1;
    // }

    //bind thread to specific core
    cpu_set_t set;
    CPU_ZERO(&set); //clear cpu set
    int cpuId = 5;
    CPU_SET(cpuId, &set); //dedicate cpu for current thread (add cpuId to set)
    //bind current thread (pId=0) to dedicated cpu
    if (sched_setaffinity(0, sizeof(set), &set) == -1) {
        printf("sched_setaffinity failed: %s.\n", std::strerror(errno));
        return -1;
    }


    // SETUP INTERFACES ADDRESSES ----------------------------------
    // -------------------------------------------------------------
    in_addr localInterface[INTERFACES];
    localInterface[0].s_addr = inet_addr(LOCAL_INTERFACE_IP0);
    localInterface[1].s_addr = inet_addr(LOCAL_INTERFACE_IP1);


    // SETUP SOCKETS -----------------------------------------------
    // -------------------------------------------------------------
    int fdmax = 0;
    int fds[INTERFACES];
    int flags;

    for (int i=0; i<INTERFACES; ++i) {
        fds[i] = socket(AF_INET, SOCK_DGRAM, 0);
        if (fds[i] == -1) {
            printf("Socket %d failed: %s.\n", i, std::strerror(errno));
            return -1;
        }

        //make sockets NONBLOCK
        if ((flags = fcntl(fds[i], F_GETFL, 0)) < 0) 
        {
            printf("F_GETFL on socket %d failed: %s.\n", i, std::strerror(errno));
        }

        if (fcntl(fds[i], F_SETFL, flags | O_NONBLOCK) < 0) 
        {
            printf("O_NONBLOCK on socket %d failed: %s.\n", i, std::strerror(errno));
        }

        if (fds[i] > fdmax) fdmax = fds[i];
        printf("Socket %d success.\n", i);
    }



    // SETUP SOCKET OPTIONS ----------------------------------------
    // -------------------------------------------------------------
    // send packets through particular interface
    for (int i=0; i<INTERFACES; ++i) {
        if (setsockopt(fds[i], IPPROTO_IP, IP_MULTICAST_IF, (char*) &localInterface[i], sizeof(localInterface[i])) == -1) {
            printf("IP_MULTICAST_IF on interface %s failed: %s.\n", inet_ntoa(localInterface[i]),  std::strerror(errno));
            return -1;
        }
    }

    // disable multicast loop
    char loopch=0;
    for (int i=0; i<INTERFACES; ++i) {
        if (setsockopt(fds[i], IPPROTO_IP, IP_MULTICAST_LOOP, (char*) &loopch, sizeof(loopch)) == -1) {
            printf("IP_MULTICAST_LOOP on interface %s failed: %s.\n", inet_ntoa(localInterface[i]), std::strerror(errno));
            return -1;
        }
    }


    // SETUP ADDRESS STRUCTURE FOR SENDING PACKETS TO --------------
    // -------------------------------------------------------------
    sockaddr_in address;
    address.sin_family = AF_INET;
    address.sin_port = htons(GROUP_PORT);
    address.sin_addr.s_addr = inet_addr(GROUP_ADDR);


    // SETUP DATA BUFFER -------------------------------------------
    // -------------------------------------------------------------
    size_t buf_size = 50000;
    char* buffer = (char*) memalign(256, buf_size);


    // SETUP SELECT() STRUCTURES -----------------------------------
    // -------------------------------------------------------------
    fd_set master, writefds;
    FD_ZERO(&master);
    FD_ZERO(&writefds);
    for (int i=0; i<INTERFACES; ++i) {
        FD_SET(fds[i], &master);
    }


    // SENDING PACKETS ---------------------------------------------
    // -------------------------------------------------------------
    size_t packets = 10000; //number of packets to send
    size_t nbytes = 0;
    int snt;

    bool pckt_flag = false; //flag for all packets are sent

    size_t cnt[INTERFACES]; //counter for sent packets per each interface
    for (int ifs=0; ifs<INTERFACES; ++ifs) cnt[ifs] = 0;

    timespec t1, t2;
    timespec t1_sel, t2_sel;
    timespec t1_proc, t2_proc;
    timespec t1_snd, t2_snd;
    long tsum_sel = 0, tsum_proc = 0, tsum_snd = 0;

    clock_gettime(CLOCK_MONOTONIC_RAW, &t1);

    while (!pckt_flag) {
        writefds = master;

        clock_gettime(CLOCK_MONOTONIC_RAW, &t1_sel);
        if (select(fdmax+1, NULL, &writefds, NULL, NULL) == -1) {
            printf("select() failed: %s.\n", std::strerror(errno));
            return -1;
        }
        clock_gettime(CLOCK_MONOTONIC_RAW, &t2_sel);
        tsum_sel += duration_mcs(t1_sel, t2_sel);

        clock_gettime(CLOCK_MONOTONIC_RAW, &t1_proc);
        for (int ifs=0; ifs<INTERFACES; ++ifs) {
            if (FD_ISSET(fds[ifs], &writefds)) {
                //check for how many packets were sent over the interface
                if (cnt[ifs] < packets) {

                    clock_gettime(CLOCK_MONOTONIC_RAW, &t1_snd);
                    snt = sendto(fds[ifs], buffer, buf_size, 0, (sockaddr*) &address, sizeof(address));
                    clock_gettime(CLOCK_MONOTONIC_RAW, &t2_snd);
                    tsum_snd += duration_mcs(t1_snd, t2_snd);

                    if (snt < buf_size) {
                        printf("Sending error: sent %d of %d bytes\n", snt, buf_size);
                    } else {
                        nbytes += snt;
                        ++cnt[ifs];
                    }
                }
            }
        }
        //renew flag
        pckt_flag = true;
        for (int ifs=0; ifs<INTERFACES; ++ifs) {
            pckt_flag = (pckt_flag && (cnt[ifs] == packets));
        }
        clock_gettime(CLOCK_MONOTONIC_RAW, &t2_proc);
        tsum_proc += duration_mcs(t1_proc, t2_proc);
    }
    clock_gettime(CLOCK_MONOTONIC_RAW, &t2);

    size_t traf_tot_bytes = nbytes;
    double duration_sec = (double) duration_mcs(t1, t2)/1000000;

    printf("Time %f s.\n", duration_sec);
    printf("Total bytes sent %d.\n", traf_tot_bytes);
    printf("Total throughput %f Gbit/s.\n", 8*(traf_tot_bytes/duration_sec)/1000000000);
    printf("Packets sent by interfaces %d/%d\n", cnt[0], cnt[1]);
    printf("tsum_sel = %d\n", tsum_sel);
    printf("tsum_proc = %d\n", tsum_proc);
    printf("tsum_snd = %d\n", tsum_snd);


    free(buffer);

    return 0;

}

在这个演示中,我插入了计时器,用于记录等待 select() (tsum_sel)、数据包处理 (tsum_proc) 和 sendto() 本身发送的总时间( tsum_snd)。

在我的系统上使用 INTERFACE = 1 输出:

Socket 0 success.
Time 0.429122 s.
Total bytes sent 500000000.
Total throughput 9.321358 Gbit/s.
Packets sent by interfaces 10000/0
tsum_sel = 51086
tsum_proc = 362756
tsum_snd = 358939

对于 INTERFACE = 2:

Socket 0 success.
Socket 1 success.
Time 0.697962 s.
Total bytes sent 1000000000.
Total throughput 11.461942 Gbit/s.
Packets sent by interfaces 10000/10000
tsum_sel = 2383
tsum_proc = 662971
tsum_snd = 652629

我看到几乎所有时间都被 sendto() 函数消耗了。所以看起来我通过第一个接口(interface)发送数据包,等待 sendto 返回,然后发送到第二个接口(interface)。为了避免这种情况,我让我的套接字成为非阻塞的。我不明白这是怎么回事。

我的问题是:

1) 为什么这段代码不以 20 Gbit/s 的速率发送数据?

2) 为什么非阻塞 sendto() 需要那么多时间?

3) 如何在这里获得 20 Gbit/s?

最佳答案

嗯,我有 19 Gbit/s。

我创建了 2 个线程 - 每个线程独立发送数据。它看起来很简单,但几乎没有阴影。如果我将线程绑定(bind)到同一个虚拟核心 - 问题仍然存在 - 它会提供 14-15 Gbit/s。只有当我将线程绑定(bind)到不同的虚拟核心时它才能正常工作。甚至这些核心都在同一个物理核心上。这是我所能希望的最好的。我可以使用 1 个物理核心进行系统维护和联网。

感谢所有评论的人。

关于c++ - 无法同时在两个 10Gbps 接口(interface)上达到全线速,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48786450/

相关文章:

c++ - 如何将整数值传递给 (const char *str) 函数参数?

linux i2c 模块不完整 xfer (0x48) 错误

c++ - 如何读取证书以使用 openssl 验证签名?

sockets - Linux 用户空间代码,用于在 Linux 板和运行 contiki udp 发送器示例代码的每个节点之间进行通信

c++ - 返回 "file already exists"错误的窗口类的 RegisterClass

c++ - Gstreamer rtsp流到appsink到openCV

java - 同一个套接字的输入流和输出流如何看到对方?

python - 如何使用 "namespace"和参数连接到远程套接字服务器?

c++ - 需要引用标准关于 main 函数作为模板函数的合法性

linux - 带有 while 循环的 Bash 脚本,直到满足动态条件