c# - 有界阻塞集合会在广告期间丢失数据吗

标签 c# multithreading parallel-processing concurrentdictionary blockingcollection

我有一个 BlockingCollection(ConcurrentBag, 50000),我在其中尝试对生产者线程使用非常小的 50,000 有界容量,以便最大限度地增加我在消费者线程的 ConcurrentDictionary 中可以处理的记录数。生产者比消费者快得多,否则会消耗大部分内存。

不幸的是,我立即注意到我的 ConcurrentDictionary 中的记录总数现在大大低于在我的测试数据执行时添加 50,000 的限制容量后应有的记录总数。我读到 BlockingCollection 的 .add 方法应该无限期地阻塞,直到集合中有空间供 add 执行。然而,情况似乎并非如此。

问题:

  1. 如果在 BlockingCollection 中的容量释放之前调用了过多的 add 方法,BlockingCollection 的 .add 方法是否最终会超时或静默失败?

  2. 如果对 #1 的回答是肯定的,那么在超出边界容量后我可以在不丢失数据的情况下尝试添加多少次?

  3. 如果调用了许多等待/阻塞容量的 BlockingCollection .add() 方法,并且调用了 CompleteAdding() 方法,那些等待/阻塞的添加会继续等待然后最终添加,还是默默地失败?

最佳答案

如果您将 BlockingCollection 与 ConcurrentDictionary 一起使用,请确保您没有在代码中某处隐藏的 BlockingCollection.TryAdd(myobject) 方法,而将其误认为是 ConcurrentDictionary.TryAdd() 方法。如果超出 BlockingCollection 的边界容量,BlockingCollection.TryAdd(myobject) 将返回 false 并丢弃产生“静默失败”的添加请求。

  1. BlockingCollection 的 .Add() 方法不会出现“静默失败”或在大量超出边界容量后丢失添加。如果有太多 .add() 正在等待添加到超出容量的 BlockingCollection 中,add() 方法最终将导致进程耗尽内存。 (这必须是流量控制问题的一个非常极端的案例)
  2. 参见 #1。
  3. 我自己的测试似乎表明,一旦调用 CompleteAdding() 方法,所有后续添加都会失败,如 MSDN 文档中所述。

关于性能的最后说明

与在同一进程中不使用边界容量和 .TryAdd() 相比,在 BlockingCollection 上使用边界容量和 .Add() 似乎(无论如何在我自己的情况下)非常慢。

通过实现我自己的边界容量策略,我取得了更好的性能结果。有很多方法可以做到这一点。三种选择包括 Thread.Sleep()、Thread.Spinwait() 或与 Monitor.PulseAll() 一起使用的 Monitor.Wait()。当使用这些策略之一时,也可以使用 BlockingCollection.TryAdd() 而不是 BlockingCollection.Add() 并且没有边界容量而不会丢失任何数据或耗尽内存。这种方法似乎也能产生更好的性能。

您可以根据生产者线程和消费者线程中的速度差异最适合的场景从三个示例中进行选择。

Thread.Wait() 示例:

//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{   //If the bounded capacity has been exceeded
    //place the thread in wait mode 
    Thread.Sleep(SleepTime);
}

Thread.SpinWait() 示例:

//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{   //If the capacity has been exceeded
    //place the thread in wait mode 
    Thread.SpinWait(SpinCount);
}  

Monitor.Wait() 示例

这个例子需要在生产者和消费者端都有一个钩子(Hook)。

生产者代码

//Check to see BlockingCollection capacity has been exceeded.
if (Tokens.Count > 50000)
{ 
    lock (syncLock)
    {   //Double check before waiting
        if (Tokens.Count > 50000)
        {
            Monitor.Wait(syncLock, 1000);
        }
    }
}

消费者代码

//Check to see BlockingCollection capacity is back a normal range.
if (Tokens.Count <= 40000)
{ 
    lock (syncLock)
    {   //Double check before waiting
        if (Tokens.Count < 40000)
        {
            Monitor.PulseAll(syncLock);
        }
    }
}

关于c# - 有界阻塞集合会在广告期间丢失数据吗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13150998/

相关文章:

c# - 可以确定方法是由派生类调用还是直接作为自身调用? C#

java - 如何解决 "Exception in thread "Animation Thread"java.lang.NullPointerException"?

.net - 经典ASP调用.NET组件-如何处理线程?

bash - 如何并行运行来自不同目录的 2 个或更多脚本

c# - Linq2SQL 表达式没有翻译的解决方案

c# - DataTable.Load 跳到多结果 DataReader 中的下一个结果集

c# - 获取文本的行数

java - 使用 FileLock 在 Java 中锁定文件

c - 如何并行比较两个以上的数字?

concurrency - 在 Ada 任务中创建过程或函数