sockets - 使用kqueue异步发送数据

标签 sockets asynchronous tcp pthreads kqueue

我有一台用普通C语言编写的服务器,在FreeBSD上使用kqueue接受TCP连接。

接受传入的连接,并将其添加到简单的连接池中,以跟踪文件句柄。

收到数据时(在EVFILT_READ上),我调用recv(),然后将有效负载放入消息队列中,以供其他线程处理。
通过这种方式接收和处理数据非常完美。

处理线程完成后,可能需要将某些内容发送回客户端。由于处理线程可以访问连接池并且可以轻松获取文件句柄,因此我只是从处理线程中调用send()。

这在99%的时间内都有效,但是kqueue时不时地给我一个EV_EOF标志,并断开了连接。

send()的调用频率与EV_EOF错误的数量之间存在明显的相关性,因此由于kqueue线程与处理线程之间的某些竞争状况,我感到EV_EOF。

对send()的调用始终返回预期的字节数,因此我没有填满tx缓冲区。

所以我的问题是; 是否可以按此处所述从单独的线程调用send()?如果没有,那么将数据异步发送回客户端的正确方法是什么?

我发现的所有示例都在与kqueue循环相同的上下文中调用send(),但是我的处理线程可能需要随时发送回数据-即使在从客户端最后一次接收到数据之后几分钟也是如此-显然我无法阻止那个时候的kqueue循环

相关代码段:

void    *tcp_srvthread(void *arg)
{
    [[...Bunch of declarations...]]

    tcp_serversocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);        
    ...
    setsockopt(tcp_serversocket, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(int));            
    ...
    err = bind(tcp_serversocket, (const struct sockaddr*)&sa, sizeof(sa));
    ...
    err = listen(tcp_serversocket, 10);
    ...
    kq = kqueue();    
    EV_SET(&evSet, tcp_serversocket, EVFILT_READ | EV_CLEAR, EV_ADD, 0, 0, NULL);
    ...
    while(!fTerminated) {
        timeout.tv_sec = 2;  timeout.tv_nsec = 0;
        nev = kevent(kq, &evSet, 0, evList, NLIST, &timeout);

        for (i=0; i<nev; i++) {            
            if (evList[i].ident == tcp_serversocket) {                              // new connection?
                socklen = sizeof(addr);
                fd = accept(evList[i].ident, &addr, &socklen);   // accept it
                if(fd > 0) {                                                       // accept ok?
                    uidx = conn_add(fd, (struct sockaddr_in *)&addr);               // Add it to connected controllers
                    if(uidx >= 0) {                                                 // add ok?                        

                        EV_SET(&evSet, fd, EVFILT_READ | EV_CLEAR, EV_ADD, 0, 0, (void*)(uint64_t)(0x00E20000 | uidx));        // monitor events from it
                        if (kevent(kq, &evSet, 1, NULL, 0, NULL) == -1) {           // monitor ok?
                            conn_delete(uidx);                                      // ..no, so delete it from my list also
                        }
                    } else {                                                        // no room on server?
                        close(fd);
                    }
                }
                else Log(0, "ERR: accept fd=%d", fd);
            }
            else
            if (evList[i].flags & EV_EOF) {
                [[ ** THIS IS CALLED SOMETIMES AFTER CALLING SEND - WHY??  ** ]]
                uidx = (uint32_t)evList[i].udata;                
                conn_delete( uidx );               
            }
            else
            if (evList[i].filter == EVFILT_READ) {                  
                if((nr = recv(evList[i].ident, buf, sizeof(buf)-2, 0)) >     0) {
                    uidx = (uint32_t)evList[i].udata;
                    recv_data(uidx, buf, nr);    // This will queue the message for the processing thread
                }
            }
        }
        else {
            // should not get here.
        }

    }
}

处理线程看起来像这样(显然,除了显示的内容外,还有很多数据操作在进行中):
void    *parsethread(void *arg)
{         
    int                 len;
    tmsg_Queue          mq;
    char                is_ok;

    while(!fTerminated) {
        if((len = msgrcv(msgRxQ, &mq, sizeof(tmsg_Queue), 0, 0)) > 0) {            
             if( process_message(mq) ) {
                 [[ processing will find the uidx of the client and build the return data ]]
                 send( ctl[uidx].fd, replydata, replydataLen, 0 );
             }
        }
    }
}

赞赏任何想法或朝着正确的方向前进。谢谢。

最佳答案

EV_EOF
如果在对等方关闭套接字的读取部分后写入套接字,则会收到RST,它会触发EVFILT_READ,并设置EV_EOF。
异步的
您应该尝试aio_read和aio_write。

关于sockets - 使用kqueue异步发送数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52999294/

相关文章:

multithreading - 帮我推理一下 F# 线程

c# - 将方法存储在自定义 Tcp 或 Udp 服务器的字典中。这是个好主意吗? C#

python - Python 的 socket.getaddrinfo/mercurial 未使用持久性 DNS 缓存

sockets - 什么是 AF_INET,为什么需要它?

sockets - 可以可靠地将 TCP 套接字连接到自身吗?

c# - Entity Framework 。删除所有行然后添加新行。异步

c - 如何在 C 中获取事件网络接口(interface)的列表?

c# - IAsyncEnumerable<T> 与 IEnumerable<Task<T>> 之间有什么区别?

linux - 关闭连接后如何保持套接字可见?

tcp - 我可以将 ZeroMQ 与基于软件的负载均衡器 HAProxy 一起使用吗?