c# - 具有序列 ID 的线程安全固定大小循环缓冲区

标签 c# collections concurrency queue thread-safety

我需要一个具有以下功能的队列:

  • 固定大小(即循环缓冲区)
  • 队列项具有 ID(如主键),它们是连续的
  • 线程安全(在多个 ASP.NET Core 请求中使用)

为了避免锁定,我 tried ConcurrentQueue 但发现竞争条件。所以我正在尝试一种自定义方法。

public interface IQueueItem
{
    long Id { get; set; }
}

public class CircularBuffer<T> : LinkedList<T> where T : class, IQueueItem
{
    public CircularBuffer(int capacity) => _capacity = capacity;
    private readonly int _capacity;

    private long _counter = 0;
    private readonly object _lock = new();

    public void Enqueue(T item)
    {
        lock (_lock) {         // works but feels "heavy"
            _counter++;
            item.Id = _counter;
            if (Count == _capacity) RemoveFirst();
            AddLast(item);
        }
    }
}

并测试:

public class Item : IQueueItem
{
    public long Id { get; set; }
    //...
}

public class Program
{
    public static void Main()
    {
        var q = new CircularBuffer<Item>(10);
        Parallel.For(0, 15, i => q.Enqueue(new Item()));
        Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
    }
}

它给出了正确的输出(即使被竞争线程排队,也是有序的,并且具有固定的大小,最旧的项目出队):

6, 7, 8, 9, 10, 11, 12, 13, 14, 15

实际上,我有读取(即枚举)该队列的网络请求。

问题:如果一个线程正在枚举队列,而另一个线程正在向队列添加内容,则会出现错误。 (我可以在读取之前使用 ToList() ,但是对于一个大型队列来说,它将占用所有服务器的内存,因为这可能会通过多个请求每秒执行多次)。我该如何处理这种情况? 我使用了链表,但我可以灵活地使用任何结构。

(另外,这似乎是一个非常重的锁定部分;有没有更高效的方法?)

更新
正如下面的评论中所询问的:我期望队列有几百到几万个项目,但项目本身很小(只有一些原始数据类型)。我预计每秒都会排队。从 Web 请求读取的频率较低,假设每分钟读取几次(但可能与服务器写入队列同时发生)。

最佳答案

根据您在问题中提供的指标,您有很多选择。 CircularBuffer<T> 的预期用途并没有那么重。包装lock - protected Queue<T> 应该工作得很好。每次枚举时将队列内容复制到数组中(每分钟复制 10,000 个元素几次)的成本不太明显。现代机器可以在眨眼之间完成这些事情。您必须每秒枚举数百次集合,这才会开始(稍微)成为一个问题。

在我原来的答案( revision 3 )中,我建议使用 ImmutableQueue<T> 作为底层存储。经过仔细检查,我发现这个类(class)并不完全是付费类(class)。第一次枚举时,它会调用内部 BackwardsReversed属性(property)(source code),这是相当昂贵的。我的性能测试证实,这是一个比简单的 lock 更糟糕的解决方案。 - protected Queue<T>lonix's 所示答案,关于 CPU 时间和分配。

下面是类似想法的较低级别实现,它利用了这样一个事实:我们只需要 ImmutableQueue<T> 的功能子集。类(class)。这些项目存储在单链表结构中,无需额外成本即可枚举:

public class ConcurrentCircularBuffer<T> : IEnumerable<T> where T : IQueueItem
{
    private readonly object _locker = new();
    private readonly int _capacity;
    private Node _head;
    private Node _tail;
    private int _count = 0;
    private long _lastId = 0;

    private class Node
    {
        public readonly T Item;
        public Node Next;
        public Node(T item) => Item = item;
    }

    public ConcurrentCircularBuffer(int capacity)
    {
        if (capacity < 1) throw new ArgumentOutOfRangeException(nameof(capacity));
        _capacity = capacity;
    }

    public int Count => Volatile.Read(ref _count);

    public void Enqueue(T item)
    {
        lock (_locker)
        {
            Node node = new(item);
            if (_head is null) _head = node;
            if (_tail is not null) _tail.Next = node;
            _tail = node;
            if (_count < _capacity) _count++; else _head = _head.Next;
            item.Id = ++_lastId;
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        Node node; int count;
        lock (_locker) { node = _head; count = _count; }
        for (int i = 0; i < count && node is not null; i++, node = node.Next)
            yield return node.Item;
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

lock 相比,这种方法的主要优点- protected Queue<T>是它最大限度地减少了争用。持有锁时完成的工作微不足道。

ConcurrentCircularBuffer<T> 的替代实现类,基于两个数组缓冲区并具有不同的优缺点,可以在 5th revision 中找到这个答案。

关于c# - 具有序列 ID 的线程安全固定大小循环缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72800733/

相关文章:

c# - AutoMapper 无法将 TestDbAsyncEnumerable 转换为 IQueryable

c# - 循环直到 TcpClient 响应完全读取

api - 如何在newman中运行来自集合的单个请求

c# - 使用另一个集合实现一个集合

ruby-on-rails - Rails 本地服务器同时发送和接收请求

java - 从修改覆盖中解决 ConcurrentModificationException

C# 正则表达式验证 "realistic"IP 值

c# - WCF Web 服务上的 System.OutOfMemoryException

java - 如何在java中序列化列表?

c++ - 并发读写的嵌入式数据库