c - 服务器中的 Leader/Follower pthread 实现

标签 c multithreading concurrency pthreads

我正在尝试实现一个基于领导者/追随者并发架构的基本服务器。 我最初只想创建一个线程池并创建一个线程队列,但在阅读此答案 (Explain "Leader/Follower" Pattern) 后,我觉得我的算法不正确,因为所选答案声称不需要互斥锁。

leader/follower 背后的思想是,你初始化一个线程池,一个线程作为“leader”线程,其余的都是“follower”线程。在服务器中,领导线程将监听传入的连接,而跟随者线程则全部休息。当领导线程检测到传入连接时,其中一个空闲的跟随者线程将被提升为新的领导线程,而前领导线程接受连接并为请求提供服务。当前领导线程完成请求服务后,它成为休息的跟随者线程。

但是,如果不使用互斥锁和条件变量,我看不到任何实现方法。目前我的实现使用线程池,并且只允许每个线程在互斥体中接受新连接。

有人可以提供领导者/追随者实现的高级解释吗?

这是我为当前实现编写的一些代码。

#define THREAD_COUNT 10
pthread_mutex_t request_tx;

int main(int argc, char* argv[])
{
    pthread_t threadA[THREAD_COUNT];
    pthread_mutex_init(&request_tx, NULL);

    //initialize server socket stuff

    for (int i = 0; i < THREAD_COUNT; ++i)
    {
        pthread_create(&threadA[i], NULL, rest, NULL);
    }

    for (int i = 0; i < THREAD_COUNT; ++i)
        pthread_join(threadA[i], NULL);

    return 0;
}

void* rest(void* kargs)
{
    int client_fd;
    struct sockaddr_in cli_addr;
    socklen_t sin_t = sizeof(cli_addr);
    while (1)
    {
        pthread_mutex_lock(&request_tx);
        client_fd = accept(server_fd, (struct sockaddr*) &cli_addr, &sin_t);
        if (client_fd > 0)
            serve(client_fd);
        else
            pthread_mutex_unlock(&request_tx);
    }
}

void serve(int client_fd)
{
    pthread_mutex_unlock(&request_tx);
    // serve request here
    ...
}

最佳答案

链接的答案是错误的。同步在某处绝对是必要的。在您的示例中,您可以删除互斥锁,因为 accept 将在内核中阻塞,并且(大多数?)实现将为传入连接激活一个线程。但是,内核内部仍然存在一些同步。

您的示例有点误导,因为领导者/追随者模式通常用于将单个请求 分配到线程池的多个连接上,而不是连接。在这一点上,事情变得相当复杂,因为完成处理的跟随者需要告诉领导者一个新的连接需要监视。如果您仅限于标准 POSIX 接口(interface),这可能会非常复杂。 (使用 epoll,应该可以将大部分复杂性移交给内核。)

一般来说,我会谨慎采用 90 年代的事件处理模式。从那时起架构发生了显着变化:NUMA 机器不再稀有,系统可以相当轻松地在单个进程中处理数万个线程,并且可以使用其他事件处理接口(interface),例如 epoll

关于c - 服务器中的 Leader/Follower pthread 实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50673979/

相关文章:

C, 数组中的随机元素打印到终端时变成 '?' 字符

c++ - 是否可以在不继承任何 Qt 对象的情况下使用 Qt 线程?

python - 在 Python 中使用 Multiprocessing+Threads 加速应用程序时遇到问题

c++ - C/C++ 函数定义

c - 大于 BST 中某个数字的节点之和

c++ - 为什么同一次执行的时间不同?

c - 当线程因某些错误而终止时如何得到通知

Java 多线程示例在不同线程上打印消息 100 次?

java - 如何编写一个 UDP 服务器来为来自不同客户端的 n 个并发请求提供服务?

multithreading - 多线程中的 Vulkan 队列同步