c# - 可以将其当前计数减少 N(N>=1)的信号量?

标签 c# multithreading thread-synchronization

我正在实现一个流量控制组件来限制可以发送的最大请求数。每个工作线程都可以发送单个请求或一批请求,但任何时候待处理请求的总数不应超过最大数量。

我最初想用 SemaphoreSlim 来实现: 将信号量初始化为最大请求数,然后当工作线程要调用服务时,它必须获得足够数量的 token ,但是我发现实际上 SemaphoreSlim 和 Semaphore 只允许线程将信号量计数减少 1,在我的例子中我希望通过工作线程携带的请求数来减少计数。

我应该在这里使用什么同步原语?

澄清一下,该服务支持批处理,因此一个线程可以在一次服务调用中发送 N 个请求,但相应地它应该能够将信号量的当前计数减少 N。

最佳答案

下面是自定义SemaphoreManyFifo提供方法的类 Wait(int acquireCount)方法和Release(int releaseCount) .它的行为是严格的 FIFO。它具有相当不错的性能(在我的 PC 的 8 个线程上每秒约 500,000 次操作)。

public class SemaphoreManyFifo : IDisposable
{
    private readonly object _locker = new object();
    private readonly Queue<(ManualResetEventSlim, int AcquireCount)> _queue;
    private readonly ThreadLocal<ManualResetEventSlim> _pool;
    private readonly int _maxCount;
    private int _currentCount;

    public int CurrentCount => Volatile.Read(ref _currentCount);

    public SemaphoreManyFifo(int initialCount, int maxCount)
    {
        // Proper arguments validation omitted
        Debug.Assert(initialCount >= 0);
        Debug.Assert(maxCount > 0 && maxCount >= initialCount);
        _queue = new Queue<(ManualResetEventSlim, int)>();
        _pool = new ThreadLocal<ManualResetEventSlim>(
            () => new ManualResetEventSlim(false), trackAllValues: true);
        _currentCount = initialCount;
        _maxCount = maxCount;
    }
    public SemaphoreManyFifo(int initialCount) : this(initialCount, Int32.MaxValue) { }

    public void Wait(int acquireCount)
    {
        Debug.Assert(acquireCount > 0 && acquireCount <= _maxCount);
        ManualResetEventSlim gate;
        lock (_locker)
        {
            Debug.Assert(_currentCount >= 0 && _currentCount <= _maxCount);
            if (acquireCount <= _currentCount && _queue.Count == 0)
            {
                _currentCount -= acquireCount; return; // Fast path
            }
            gate = _pool.Value;
            gate.Reset(); // Important, because the gate is reused
            _queue.Enqueue((gate, acquireCount));
        }
        gate.Wait();
    }

    public void Release(int releaseCount)
    {
        Debug.Assert(releaseCount > 0);
        lock (_locker)
        {
            Debug.Assert(_currentCount >= 0 && _currentCount <= _maxCount);
            if (releaseCount > _maxCount - _currentCount)
                throw new SemaphoreFullException();
            _currentCount += releaseCount;
            while (_queue.Count > 0 && _queue.Peek().AcquireCount <= _currentCount)
            {
                var (gate, acquireCount) = _queue.Dequeue();
                _currentCount -= acquireCount;
                gate.Set();
            }
        }
    }

    public void Dispose()
    {
        foreach (var gate in _pool.Values) gate.Dispose();
        _pool.Dispose();
    }
}

在上述实现中添加对超时和取消的支持并非易事。它需要一个不同的(可更新的)数据结构而不是 Queue<T> .


原文 Wait+Pulse 实现可以在 1st revision 中找到这个答案。它很简单,但缺少理想的 FIFO 行为。

关于c# - 可以将其当前计数减少 N(N>=1)的信号量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56283112/

相关文章:

c# - 在 C# 中将消息分成更小的 block

c# - MVVM-更改 View 模型中的字段时验证模型

c# - 如何设置属性选择器的值 Expression<Func<T,TResult>>

c++ - 如何允许线程 2 在我在线程 1 中打开的端口上进行通信?

java - 如何使用同步线程将数据插入到我的数据库表中

c# - 电报机器人 : GetUpdates Conflict

java - Java多线程中如何使用CountDownLatch?

c++ - 为什么作者声称此代码会导致种族歧视?

c# - WPF Dispatcher 线程卡住主窗口

java - 线程同步 - 同步三个线程打印 012012012012..... 不起作用