c# - 以原子方式从 ConcurrentQueue 中获取所有内容

标签 c# multithreading collections queue concurrent-queue

我有多个线程生成项目并将它们粘贴在一个共同的 ConcurrentQueue 中:

private ConcurrentQueue<GeneratedItem> queuedItems = new ConcurrentQueue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    queuedItems.Enqueue(new GeneratedItem(...));
    // ...
}

我有另一个单一的消费者线程,但它需要在这个应用程序的上下文中工作的方式是,偶尔,它只需要捕获当前线程队列中的所有内容,将其从该队列中删除,一键完成。就像是:
private Queue<GeneratedItem> GetAllNewItems () {

    return queuedItems.TakeEverything(); // <-- not a real method

}

我想我查看了所有文档(关于集合及其实现的接口(interface)),但我似乎没有找到类似“同时从队列中获取所有对象”,甚至“同时与另一个队列交换内容”之类的东西。

如果我放弃 ConcurrentQueue,我可以做到这一点。并且只保护一个正常的Queuelock , 像这样:
private Queue<GeneratedItem> queuedItems = new Queue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    lock (queuedItems) {
        queuedItems.Enqueue(new GeneratedItem(...));
    }
    // ...
}

private Queue<GeneratedItem> GetAllNewItems () {

    lock (queuedItems) {
        Queue<GeneratedItem> newItems = new Queue<Event>(queuedItems);
        queuedItems.Clear();
        return newItems;
    }

}

但是,我喜欢 ConcurrentQueue 的便利性而且因为我刚刚学习 C#,所以我对 API 很好奇;所以我的问题是,有没有办法用其中一个并发集合来做到这一点?

是否有某种方法可以访问任何同步对象 ConcurrentQueue为自己的目的使用并锁定它,以便一切都很好地融合在一起?然后我可以锁定它,拿走所有东西,然后释放?

最佳答案

这取决于你想做什么。根据 source code 中的评论

//number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot.

这通过内部调用 ToList() 来工作。这反过来适用于m_numSnapshotTakers和自旋机制
/// Copies the <see cref="ConcurrentQueue{T}"/> elements to a new <see
/// cref="T:System.Collections.Generic.List{T}"/>.
/// </summary>
/// <returns>A new <see cref="T:System.Collections.Generic.List{T}"/> containing a snapshot of
/// elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns>
private List<T> ToList()
{
   // Increments the number of active snapshot takers. This increment must happen before the snapshot is 
   // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
   // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. 
   Interlocked.Increment(ref m_numSnapshotTakers);

   List<T> list = new List<T>();
   try
   {
       //store head and tail positions in buffer, 
       Segment head, tail;
       int headLow, tailHigh;
       GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);

       if (head == tail)
       {
           head.AddToList(list, headLow, tailHigh);
       }
       else
       {
           head.AddToList(list, headLow, SEGMENT_SIZE - 1);
           Segment curr = head.Next;
           while (curr != tail)
           {
               curr.AddToList(list, 0, SEGMENT_SIZE - 1);
               curr = curr.Next;
           }
           //Add tail segment
           tail.AddToList(list, 0, tailHigh);
       }
   }
   finally
   {
       // This Decrement must happen after copying is over. 
       Interlocked.Decrement(ref m_numSnapshotTakers);
   }
   return list;
}

如果您只需要快照,那么您很幸运。但是,似乎没有内置方法可以从 ConcurrentQueue 中获取和删除所有项目。以线程安全的方式。您需要使用 lock 烘焙自己的同步。或类似的。或者自己动手(从源头上看可能并不难)。

关于c# - 以原子方式从 ConcurrentQueue 中获取所有内容,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60158415/

相关文章:

c# - 输出私有(private)字符串子

c# - 将多个参数从 c# 传递到 python

c# - ServiceStack JsonSerializer 不序列化公共(public)成员

java - 具有非唯一键的排序映射

java - 以相反的顺序更改 LinkedHashMap 项

c# - C#winmm.dll mp3播放器快进方法?

java - 多线程中 Collection.sort() 中的 NullpointerException

c - 如何创建轻量级内核线程?

python - 队列和集合 Python 之间的区别

java - 如何一次删除不同的可更新记录?