C & 低级信号量实现

标签 c multithreading assembly mutex semaphore

我在考虑如何使用尽可能少的 asm 代码来实现信号量(不是二进制)。
在不使用互斥锁的情况下,我还没有成功地思考和编写它,所以这是迄今为止我能做的最好的事情:

全局:

#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>

typedef struct
{
    atomic_ullong    value;
    pthread_mutex_t *lock_op;
    bool             ready;
} semaphore_t;

typedef struct
{
    atomic_ullong   value;
    pthread_mutex_t lock_op;
    bool            ready;
} static_semaphore_t;

 /* use with static_semaphore_t */
#define SEMAPHORE_INITIALIZER(value) = {value, PTHREAD_MUTEX_INITIALIZER, true}

功能:
bool semaphore_init(semaphore_t *semaphore, unsigned long long init_value)
{   
    if(semaphore->ready) if(!(semaphore->lock_op = \
                             calloc(1, sizeof(pthread_mutex_t)))) return false;
    else                 pthread_mutex_destroy(semaphore->lock_op);   

    if(pthread_mutex_init(semaphore->lock_op, NULL))
            return false;

    semaphore->value = init_value;
    semaphore->ready = true;
    return true;
}

bool semaphore_wait(semaphore_t *semaphore)
{
    if(!semaphore->ready) return false;

    pthread_mutex_lock(&(semaphore->lock_op));
    while(!semaphore->value) __asm__ __volatile__("nop");
    (semaphore->value)--;
    pthread_mutex_unlock(&(semaphore->lock_op));
    return true;
}

bool semaphore_post(semaphore_t *semaphore)
{
    if(!semaphore->ready) return false;

    atomic_fetch_add(&(semaphore->value), (unsigned long long) 1);
    return true;
}

是否可以仅使用几行代码,使用原子内置函数或直接在汇编中实现信号量(例如 lock cmpxchg )?

查看来自 <bits/sempahore.h> 的 sem_t 结构收录于 <semaphore.h>在我看来,它被选择了一条非常不同的道路......
typedef union
{
    char __size[__SIZEOF_SEM_T];
    long int __align;
} sem_t;



更新:

@PeterCordes 提出了一个绝对更好的解决方案,使用原子,没有互斥锁,直接对信号量值进行检查。

我仍然想更好地了解在性能方面改进代码的机会,利用内置的暂停函数或内核调用来避免 CPU 浪费,等待关键资源可用。

有一个互斥体和非二进制信号量的标准实现来进行比较也很好。
来自 futex(7)我读到:“Linux 内核提供 futexes(“快速用户空间互斥锁”)作为快速用户空间锁定和信号量的构建块。Futexes 非常基础,非常适合构建更高级别的锁定抽象,例如互斥锁,条件变量、读写锁、屏障和信号量。”

最佳答案

请参阅我的最小天真信号量实现的部分内容,它可能有效。它编译并适合 x86。我认为这对于任何 C11 实现来说都是正确的。

IIRC,可以到 implement a counting lock (aka semaphore) with just a single integer ,您可以通过原子操作访问它。该维基百科链接甚至提供了 up 的算法/down .您不需要单独的互斥锁。如 atomic_ullong需要一个互斥锁来支持 objective-c PU 上的原子递增/递减,它将包括一个。 (在 32 位 x86 上可能是这种情况,或者实现使用慢 cmpxchg8 而不是快速 lock xadd 。32 位计数器对于您的信号量来说真的太小了吗?因为 64 位原子在 32 位机器上会更慢。)
<bits/sempahore.h> union 定义显然只是一个具有正确大小的不透明 POD 类型,并不表示实际实现。

正如@David Schwartz 所说,除非您是专家,否则为实际使用实现自己的锁定是愚蠢的。不过,这可能是一种有趣的方式来了解原子操作并找出标准实现中的内幕。请仔细注意他的警告,即锁定实现很难测试。您可以使用当前版本的编译器和您选择的编译选项编写适用于您的硬件上的测试用例的代码......
ready boolean 只是完全浪费空间。如果可以正确初始化ready标志以便函数查看它有意义,然后您可以将其他字段初始化为合理的初始状态。

我注意到您的代码还有一些其他问题:

#define SEMAPHORE_INITIALIZER(value) = {value, PTHREAD_MUTEX_INITIALIZER, true};

static_semaphore_t my_lock = SEMAPHORE_INITIALIZER(1);
// expands to my_lock = = {1, PTHREAD_MUTEX_INITIALIZER, true};;
// you should leave out the = and ; in the macro def so it works like a value

使用动态分配的 pthread_mutex_t *lock_op只是愚蠢。使用值,而不是指针。您的大多数锁定功能都使用互斥锁,因此额外的间接级别只会减慢速度。内存和计数器一起在那里会好得多。互斥锁不需要太多空间。
while(!semaphore->value) __asm__ __volatile__("nop");

我们希望这个循环避免浪费功率和减慢其他线程的速度,甚至其他逻辑线程与超线程共享相同的核心。
nop不会使忙等待循环减少 CPU 密集型。即使使用超线程,它在 x86 上也可能没有区别,因为整个循环体可能仍然适合 4 uop,因此每个时钟的一次迭代中是否存在 nop 的问题在那里与否。 nop不需要执行单元,所以至少它不会受到伤害。这个自旋循环发生在持有互斥锁的情况下,这看起来很愚蠢。所以第一个服务员将进入这个自旋循环,而之后的服务员将在互斥锁上自旋。

这是我对信号量的幼稚实现,仅使用 C11 原子操作

我认为这是一个很好的实现,它实现了正确和小(源代码和机器代码)的非常有限的目标,并且不使用其他实际的锁定原语。有些主要领域我什至没有尝试解决(例如公平/饥饿,将 CPU 交给其他线程,可能是其他东西)。

asm output on godbolt : down 只有 12 个 x86 insns , 2 为 up (包括 ret s)。 Godbolt 的非 x86 编译器(ARM/ARM64/PPC 的 gcc 4.8)太旧,无法支持 C11 <stdatomic.h> . (不过,他们确实有 C++ std::atomic )。所以不幸的是,我无法轻松检查非 x86 上的 asm 输出。
#include <stdatomic.h>
#define likely(x)       __builtin_expect(!!(x), 1)
#define unlikely(x)     __builtin_expect(!!(x), 0)

typedef struct {
  atomic_int val;   // int is plenty big.  Must be signed, so an extra decrement doesn't make 0 wrap to >= 1
} naive_sem_t;

#if defined(__i386__) || defined(__x86_64__)
#include <immintrin.h>
static inline void spinloop_body(void) { _mm_pause(); }  // PAUSE is "rep nop" in asm output
#else
static inline void spinloop_body(void) { }
#endif

void sem_down(naive_sem_t *sem)
{
  while (1) {
    while (likely(atomic_load_explicit(&(sem->val), memory_order_acquire ) < 1))
        spinloop_body();  // wait for a the semaphore to be available
    int tmp = atomic_fetch_add_explicit( &(sem->val), -1, memory_order_acq_rel );  // try to take the lock.  Might only need mo_acquire
    if (likely(tmp >= 1))
        break;              // we successfully got the lock
    else                    // undo our attempt; another thread's decrement happened first
        atomic_fetch_add_explicit( &(sem->val), 1, memory_order_release ); // could be "relaxed", but we're still busy-waiting and want other thread to see this ASAP
  }
}
// note the release, not seq_cst.  Use a stronger ordering parameter if you want it to be a full barrier.
void sem_up(naive_sem_t *sem) {
    atomic_fetch_add_explicit(&(sem->val), 1, memory_order_release);
}

这里的诀窍是 val 没问题暂时太低 ;这只会让其他线程旋转。另请注意 fetch_add单个原子操作是关键 .它返回旧值,因此我们可以检测何时 val在 while 循环的加载和 fetch_add 之间被另一个线程占用。 (注意,我们不需要检查 tmp 是否 == 到 while 循环的加载:如果另一个线程 up 在加载和 fetch_add 之间编辑信号量就可以了。这是使用 fetch_add 而不是cmpxchg)。
atomic_load自旋循环只是对让所有服务员在 val 上进行原子读-修改-写的性能优化。 . (尽管许多服务员试图用 inc 解密然后撤消,但让服务员看到解锁的锁可能非常罕见)。

一个真正的实现将有更多平台的特殊东西,而不仅仅是 x86。对于 x86,可能不仅仅是 PAUSE自旋循环内的指令。这仍然只是一个完全可移植的 C11 实现的玩具示例。 PAUSE显然有助于避免对内存排序的错误推测,因此在离开自旋循环后 CPU 运行更有效。 pause与将逻辑 CPU 交给操作系统以运行不同线程不同。也与memory_order_???的正确性和选择无关。参数。

一个真正的实现可能会在一定次数的旋转迭代后将 CPU 交给操作系统( sched_yield(2) ,或更可能是 futex 系统调用,见下文)。也许使用 x86 MONITOR/MWAIT对超线程更加友好;我不知道。我从来没有真正实现过锁定自己,我只是在查找其他 insn 时在 x86 insn 引用中看到所有这些东西。

如前所述,x86 的 lock xadd指令工具fetch_add (具有顺序一致性语义,因为 lock ed 指令总是一个完整的内存屏障)。在非 x86 上,只对 fetch_add 使用获取+释放语义,而不是完全顺序一致性可能会允许更高效的代码。我不确定,但只使用 acquire很可能允许在 ARM64 上使用更高效的代码。我想我们只需要 acquire on the fetch_add, not acq_rel ,但我不确定。在 x86 上,代码没有任何区别,因为 lock ed 指令是执行原子读-修改-写的唯一方法,所以即使 relaxed将与 seq_cst 相同(除了 compile-time reordering 。)

如果你想让出 CPU 而不是旋转,你需要一个系统调用(正如人们所说)。显然,为了使标准库锁定在 Linux 上尽可能高效,已经进行了大量工作。当锁被释放时,有专门的系统调用来帮助内核唤醒正确的线程,而且它们使用起来并不简单。 From futex(7) :

NOTES
To reiterate, bare futexes are not intended as an easy-to-use abstraction for end-users. (There is no wrapper function for this system call in glibc.) Implementors are expected to be assembly literate and to have read the sources of the futex user-space library referenced below.



公平/饥饿(我幼稚的实现忽略了)

正如维基百科文章所提到的,某种唤醒队列是一个好主意,因此同一个线程不会每次都获得信号量。 (释放后快速获取锁的代码通常会让释放线程在其他线程仍处于 sleep 状态时获得锁)。

这是进程中内核协作的另一个主要好处 (futex)。

关于C & 低级信号量实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36094115/

相关文章:

c - 在 Assembly x86 中遍历二维数组

c - 打印一个后如何停止循环?

java - 如何使用基于 Hibernate 的 DAO 从不同线程实现数据库中表的更新?

c# - 这是实现异步编程模型的好方法吗?

用汇编函数编译 C 程序,反之亦然

c - '[' 标记之前的汇编语言解析错误

c - 在 pthread_create 中使用函数指针数组 - 接近初始化

c - 静态变量重新初始化解决方案

c - printf() 修复指针问题

java - Thread.sleep(xx) 但整个程序都 hibernate 了?