c - 无锁栈实现思路——目前已失效

标签 c stack lockless rcu aba

我想出了一个想法,我正在尝试实现一个无锁堆栈,它不依赖引用计数来解决 ABA 问题,并且还可以正确处理内存回收。它在概念上与 RCU 相似,并且依赖于两个功能:将列表条目标记为已删除,以及跟踪遍历列表的读者。前者很简单,它只是使用指针的 LSB。后者是我在实现无限制无锁堆栈的方法上的“聪明”尝试。

基本上,当任何线程尝试遍历列表时,一个原子计数器 (list.entries) 会递增。当遍历完成时,第二个计数器 (list.exits) 会增加。

节点分配由 push 处理,释放由 pop 处理。

push 和 pop 操作与朴素的无锁堆栈实现非常相似,但是必须遍历标记为要删除的节点才能到达未标记的条目。因此,推送基本上很像链表插入。

pop 操作类似地遍历列表,但它使用 atomic_fetch_or 在遍历时将节点标记为已删除,直到到达未标记的节点。

在遍历 0 个或多个标记节点的列表后,正在弹出的线程将尝试对堆栈的头部进行 CAS。至少有一个线程并发弹出会成功,在此之后所有进入堆栈的读取器将不再看到以前标记的节点。

成功更新列表的线程然后加载原子 list.entries,并且基本上自旋加载 atomic.exits,直到该计数器最终超过 list.entries。这应该意味着列表的“旧”版本的所有读者都已完成。然后线程简单地释放它从列表顶部交换的标记节点列表。

所以 pop 操作的含义应该是(我认为)不存在 ABA 问题,因为在所有使用它们的并发读者完成之前,被释放的节点不会返回到可用的指针池中,显然内存回收问题出于同样的原因,也会被处理。

所以无论如何,这是理论,但我仍然对实现摸不着头脑,因为它目前不起作用(在多线程情况下)。似乎我在免费问题之后得到了一些写作,但我在发现问题时遇到了麻烦,或者我的假设可能有缺陷并且它不起作用。

任何关于概念和调试代码方法的见解都将不胜感激。

这是我当前的(损坏的)代码(使用 gcc -D_GNU_SOURCE -std=c11 -Wall -O0 -g -pthread -o list list.c 编译):

#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>

#include <sys/resource.h>

#include <stdio.h>
#include <unistd.h>

#define NUM_THREADS 8
#define NUM_OPS (1024 * 1024)

typedef uint64_t list_data_t;

typedef struct list_node_t {
    struct list_node_t * _Atomic next;
    list_data_t data;
} list_node_t;

typedef struct {
    list_node_t * _Atomic head;
    int64_t _Atomic size;
    uint64_t _Atomic entries;
    uint64_t _Atomic exits;
} list_t;

enum {
    NODE_IDLE    = (0x0),
    NODE_REMOVED = (0x1 << 0),
    NODE_FREED   = (0x1 << 1),
    NODE_FLAGS    = (0x3),
};

static __thread struct {
    uint64_t add_count;
    uint64_t remove_count;
    uint64_t added;
    uint64_t removed;
    uint64_t mallocd;
    uint64_t freed;
} stats;

#define NODE_IS_SET(p, f) (((uintptr_t)p & f) == f)
#define NODE_SET_FLAG(p, f) ((void *)((uintptr_t)p | f))
#define NODE_CLR_FLAG(p, f) ((void *)((uintptr_t)p & ~f))
#define NODE_POINTER(p) ((void *)((uintptr_t)p & ~NODE_FLAGS))

list_node_t * list_node_new(list_data_t data)
{
    list_node_t * new = malloc(sizeof(*new));
    new->data = data;
    stats.mallocd++;

    return new;
}

void list_node_free(list_node_t * node)
{
    free(node);
    stats.freed++;
}

static void list_add(list_t * list, list_data_t data)
{
    atomic_fetch_add_explicit(&list->entries, 1, memory_order_seq_cst);

    list_node_t * new = list_node_new(data);
    list_node_t * _Atomic * next = &list->head;
    list_node_t * current = atomic_load_explicit(next,  memory_order_seq_cst);
    do
    {
        stats.add_count++;
        while ((NODE_POINTER(current) != NULL) &&
                NODE_IS_SET(current, NODE_REMOVED))
        {
                stats.add_count++;
                current = NODE_POINTER(current);
                next = &current->next;
                current = atomic_load_explicit(next, memory_order_seq_cst);
        }
        atomic_store_explicit(&new->next, current, memory_order_seq_cst);
    }
    while(!atomic_compare_exchange_weak_explicit(
            next, &current, new,
            memory_order_seq_cst, memory_order_seq_cst));

    atomic_fetch_add_explicit(&list->exits, 1, memory_order_seq_cst);
    atomic_fetch_add_explicit(&list->size, 1, memory_order_seq_cst);
    stats.added++;
}

static bool list_remove(list_t * list, list_data_t * pData)
{
    uint64_t entries = atomic_fetch_add_explicit(
            &list->entries, 1, memory_order_seq_cst);

    list_node_t * start = atomic_fetch_or_explicit(
            &list->head, NODE_REMOVED, memory_order_seq_cst);
    list_node_t * current = start;

    stats.remove_count++;
    while ((NODE_POINTER(current) != NULL) &&
            NODE_IS_SET(current, NODE_REMOVED))
    {
        stats.remove_count++;
        current = NODE_POINTER(current);
        current = atomic_fetch_or_explicit(&current->next,
                NODE_REMOVED, memory_order_seq_cst);
    }

    uint64_t exits = atomic_fetch_add_explicit(
            &list->exits, 1, memory_order_seq_cst) + 1;

    bool result = false;
    current = NODE_POINTER(current);
    if (current != NULL)
    {
        result = true;
        *pData = current->data;

        current = atomic_load_explicit(
                &current->next, memory_order_seq_cst);

        atomic_fetch_add_explicit(&list->size,
                -1, memory_order_seq_cst);

        stats.removed++;
    }

    start = NODE_SET_FLAG(start, NODE_REMOVED);
    if (atomic_compare_exchange_strong_explicit(
            &list->head, &start, current,
            memory_order_seq_cst, memory_order_seq_cst))
    {
        entries = atomic_load_explicit(&list->entries, memory_order_seq_cst);
        while ((int64_t)(entries - exits) > 0)
        {
            pthread_yield();
            exits = atomic_load_explicit(&list->exits, memory_order_seq_cst);
        }

        list_node_t * end = NODE_POINTER(current);
        list_node_t * current = NODE_POINTER(start);
        while (current != end)
        {
            list_node_t * tmp = current;
            current = atomic_load_explicit(&current->next, memory_order_seq_cst);
            list_node_free(tmp);
            current = NODE_POINTER(current);
        }
    }

    return result;
}

static list_t list;

pthread_mutex_t ioLock = PTHREAD_MUTEX_INITIALIZER;

void * thread_entry(void * arg)
{
    sleep(2);
    int id = *(int *)arg;

    for (int i = 0; i < NUM_OPS; i++)
    {
        bool insert = random() % 2;

        if (insert)
        {
            list_add(&list, i);
        }
        else
        {
            list_data_t data;
            list_remove(&list, &data);
        }
    }

    struct rusage u;
    getrusage(RUSAGE_THREAD, &u);

    pthread_mutex_lock(&ioLock);
    printf("Thread %d stats:\n", id);
    printf("\tadded = %lu\n", stats.added);
    printf("\tremoved = %lu\n", stats.removed);
    printf("\ttotal added = %ld\n", (int64_t)(stats.added - stats.removed));
    printf("\tadded count = %lu\n", stats.add_count);
    printf("\tremoved count = %lu\n", stats.remove_count);
    printf("\tadd average = %f\n", (float)stats.add_count / stats.added);
    printf("\tremove average = %f\n", (float)stats.remove_count / stats.removed);
    printf("\tmallocd = %lu\n", stats.mallocd);
    printf("\tfreed = %lu\n", stats.freed);
    printf("\ttotal mallocd = %ld\n", (int64_t)(stats.mallocd - stats.freed));
    printf("\tutime = %f\n", u.ru_utime.tv_sec
            + u.ru_utime.tv_usec / 1000000.0f);
    printf("\tstime = %f\n", u.ru_stime.tv_sec
                    + u.ru_stime.tv_usec / 1000000.0f);
    pthread_mutex_unlock(&ioLock);

    return NULL;
}

int main(int argc, char ** argv)
{
    struct {
            pthread_t thread;
            int id;
    }
    threads[NUM_THREADS];
    for (int i = 0; i < NUM_THREADS; i++)
    {
        threads[i].id = i;
        pthread_create(&threads[i].thread, NULL, thread_entry, &threads[i].id);
    }

    for (int i = 0; i < NUM_THREADS; i++)
    {
        pthread_join(threads[i].thread, NULL);
    }

    printf("Size = %ld\n", atomic_load(&list.size));

    uint32_t count = 0;

    list_data_t data;
    while(list_remove(&list, &data))
    {
        count++;
    }
    printf("Removed %u\n", count);
}

最佳答案

您提到您正在尝试解决 ABA 问题,但描述和代码实际上是尝试解决一个更难的问题:memory reclamation问题。

这个问题通常出现在用没有垃圾收集的语言实现的无锁收集的“删除”功能中。核心问题是从共享结构中删除节点的线程通常不知道何时可以安全地释放删除的节点,因为其他读取可能仍然引用它。解决这个问题通常,作为一个副作用,也解决了 ABA 问题:这特别是关于 CAS 操作成功,即使底层指针(和对象的状态)在此期间至少更改了两次,最终得到原始值,但呈现完全不同的状态。

ABA 问题在某种意义上更容易,因为有几个直接解决 ABA 问题的方法,特别是不会导致“内存回收”问题的解决方案。从某种意义上说,可以检测位置修改的硬件(例如,使用 LL/SC 或事务性内存原语)可能根本不会出现问题,这也更容易。

也就是说,您正在寻找内存回收问题的解决方案,它也将避免 ABA 问题。

您的问题的核心是以下声明:

The thread that successfully updates the list then loads the atomic list.entries, and basically spin-loads atomic.exits until that counter finally exceeds list.entries. This should imply that all readers of the "old" version of the list have completed. The thread then simply frees the the list of marked nodes that it swapped off the top of the list.



这个逻辑不成立。等待list.exits (你说 atomic.exits 但我认为这是一个错字,因为你只在其他地方谈论 list.exits)大于 list.entries只告诉您,在变异线程捕获条目计数时,总退出数比条目数多。然而,这些退出可能是由来来往往的新读者产生的:这根本不意味着所有的老读者都像你所说的那样完成了!

这是一个简单的例子。首先写一个线程T1和阅读线程T2大约在同一时间访问列表,所以 list.entries是 2 和 list.exits为0。写线程弹出一个节点,并保存list.entries的当前值(2)并等待lists.exits大于 2。现在多了三个阅读线程,T3 , T4 , T5到达并快速阅读列表并离开。现在lists.exits是 3,并且满足您的条件并且 T1释放节点。 T2虽然它没有去任何地方并且因为它正在读取一个释放的节点而爆炸!

你的基本想法可以工作,但你的两个反方法尤其是绝对行不通。

这是一个经过充分研究的问题,因此您不必发明自己的算法(参见上面的链接),甚至不必编写自己的代码,因为像 librcu 这样的东西和 concurrencykit已经存在。

用于教育目的

但是,如果您想让这项工作用于教育目的,一种方法是使用确保在修改后进入的线程已经开始使用不同的 list.entry/exit 集合。计数器。一种方法是生成计数器,当作者想要修改列表时,它会增加生成计数器,这会导致新读者切换到不同的 list.entry/exit 集合。计数器。

现在作者只需要等待list.entry[old] == list.exists[old] ,这意味着所有的老读者都离开了。您也可以每代只使用一个计数器:您实际上并没有两个 entry/exit计数器(尽管它可能有助于减少争用)。

当然,你知道管理这个每代单独计数器列表的新问题......这看起来像构建无锁列表的原始问题!这个问题稍微容易一些,因为您可能会对“正在运行”的代数设置一些合理的限制,并预先将它们全部分配,或者您可能会实现一种更容易推理的有限类型的无锁列表因为添加和删除只发生在头部或尾部。

关于c - 无锁栈实现思路——目前已失效,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50803497/

相关文章:

c++ - 如何在C++中使用无锁循环缓冲区实现零拷贝tcp

计算字符数组中的个数

更改内存中的 'call'地址?

c - 使用 CAS(Compare And Swap)时,如何确保旧值确实是旧值?

c - C语言中的栈,如何实现

c# - 栈空异常

c++ - 无锁 fifo 缓冲区中的删除节点检测

c - 如何计算文件中每个数字的出现频率?

c - 在 C 语言中使用 If else 处理 Char

c - 如何将两个字节数组转换为无符号长变量?