我需要一个具有以下功能的队列:
- 固定大小(即循环缓冲区)
- 队列项具有 ID(如主键),它们是连续的
- 线程安全(在多个 ASP.NET Core 请求中使用)
为了避免锁定,我 tried ConcurrentQueue
但发现竞争条件。所以我正在尝试一种自定义方法。
public interface IQueueItem
{
long Id { get; set; }
}
public class CircularBuffer<T> : LinkedList<T> where T : class, IQueueItem
{
public CircularBuffer(int capacity) => _capacity = capacity;
private readonly int _capacity;
private long _counter = 0;
private readonly object _lock = new();
public void Enqueue(T item)
{
lock (_lock) { // works but feels "heavy"
_counter++;
item.Id = _counter;
if (Count == _capacity) RemoveFirst();
AddLast(item);
}
}
}
并测试:
public class Item : IQueueItem
{
public long Id { get; set; }
//...
}
public class Program
{
public static void Main()
{
var q = new CircularBuffer<Item>(10);
Parallel.For(0, 15, i => q.Enqueue(new Item()));
Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
}
}
它给出了正确的输出(即使被竞争线程排队,也是有序的,并且具有固定的大小,最旧的项目出队):
6, 7, 8, 9, 10, 11, 12, 13, 14, 15
实际上,我有读取(即枚举)该队列的网络请求。
问题:如果一个线程正在枚举队列,而另一个线程正在向队列添加内容,则会出现错误。 (我可以在读取之前使用 ToList()
,但是对于一个大型队列来说,它将占用所有服务器的内存,因为这可能会通过多个请求每秒执行多次)。我该如何处理这种情况? 我使用了链表,但我可以灵活地使用任何结构。
(另外,这似乎是一个非常重的锁定部分;有没有更高效的方法?)
更新
正如下面的评论中所询问的:我期望队列有几百到几万个项目,但项目本身很小(只有一些原始数据类型)。我预计每秒都会排队。从 Web 请求读取的频率较低,假设每分钟读取几次(但可能与服务器写入队列同时发生)。
最佳答案
根据您在问题中提供的指标,您有很多选择。 CircularBuffer<T>
的预期用途并没有那么重。包装lock
- protected Queue<T>
应该工作得很好。每次枚举时将队列内容复制到数组中(每分钟复制 10,000 个元素几次)的成本不太明显。现代机器可以在眨眼之间完成这些事情。您必须每秒枚举数百次集合,这才会开始(稍微)成为一个问题。
在我原来的答案( revision 3 )中,我建议使用 ImmutableQueue<T>
作为底层存储。经过仔细检查,我发现这个类(class)并不完全是付费类(class)。第一次枚举时,它会调用内部 BackwardsReversed
属性(property)(source code),这是相当昂贵的。我的性能测试证实,这是一个比简单的 lock
更糟糕的解决方案。 - protected Queue<T>
如 lonix's 所示答案,关于 CPU 时间和分配。
下面是类似想法的较低级别实现,它利用了这样一个事实:我们只需要 ImmutableQueue<T>
的功能子集。类(class)。这些项目存储在单链表结构中,无需额外成本即可枚举:
public class ConcurrentCircularBuffer<T> : IEnumerable<T> where T : IQueueItem
{
private readonly object _locker = new();
private readonly int _capacity;
private Node _head;
private Node _tail;
private int _count = 0;
private long _lastId = 0;
private class Node
{
public readonly T Item;
public Node Next;
public Node(T item) => Item = item;
}
public ConcurrentCircularBuffer(int capacity)
{
if (capacity < 1) throw new ArgumentOutOfRangeException(nameof(capacity));
_capacity = capacity;
}
public int Count => Volatile.Read(ref _count);
public void Enqueue(T item)
{
lock (_locker)
{
Node node = new(item);
if (_head is null) _head = node;
if (_tail is not null) _tail.Next = node;
_tail = node;
if (_count < _capacity) _count++; else _head = _head.Next;
item.Id = ++_lastId;
}
}
public IEnumerator<T> GetEnumerator()
{
Node node; int count;
lock (_locker) { node = _head; count = _count; }
for (int i = 0; i < count && node is not null; i++, node = node.Next)
yield return node.Item;
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
与 lock
相比,这种方法的主要优点- protected Queue<T>
是它最大限度地减少了争用。持有锁时完成的工作微不足道。
ConcurrentCircularBuffer<T>
的替代实现类,基于两个数组缓冲区并具有不同的优缺点,可以在 5th revision 中找到这个答案。
关于c# - 具有序列 ID 的线程安全固定大小循环缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72800733/