我有一个生成器,它按突发生成整数(几秒钟内从 1 到 50)。我有一个消费者按 block 消费这些整数。
我想让消费者在生产者爆完后开始消费(我没有对生产者的领先,我只知道它已经完成生产5秒没有生产)。
我想到了这两种不同的方式:
第一:使用一种消费者通知另一个消费者:
private readonly List<int> _ids = new List<int>();
private readonly ManualResetEvent _mainWaiter = new ManualResetEvent(false);
private readonly ManualResetEvent _secondaryWaiter = new ManualResetEvent(false);
//This methods consumes the id from the producer
public void OnConsumeId(int newId)
{
lock(_ids)
{
_ids.Add(newId);
_mainWaiter.Set();
_secondaryWaiter.Set();
}
}
//This methods runs on the dedicated thread :
public void ConsumerIdByBlock()
{
while(true)
{
_mainWaiter.Wait();
while(_secondaryWaiter.Wait(5000));
List<int> localIds;
lock(_ids)
{
localIds = new List<int>(_ids);
_ids.Clear();
}
//Do the job with localIds
}
}
第二:有一种 token 用于最后一次更新
//This methods consumes the id from the producer
private int _lastToken;
public void OnConsumeId(int newId)
{
lock(_ids)
{
_ids.Add(newId);
ThreadPool.Queue(()=>ConsumerIdByBlock(++_lastToken));
}
}
//This methods runs on the dedicated thread :
public void ConsumerIdByBlock(int myToken)
{
Thread.Sleep(5000);
List<int> localIds;
lock(_ids)
{
if(myToken !=_lastToken)
return;
localIds = new List<int>(_ids);
_ids.Clear();
}
//Do the job with localIds
}
但我发现这些方法对于执行此操作来说有点太复杂了。是否存在 native /更简单的解决方案?你会怎么做?
最佳答案
如果您使用已经具有通知等的线程安全队列,这将变得容易得多。 BlockingCollection使编写生产者-消费者的东西变得非常容易。
我喜欢您的“链接消费者”想法,因为您不必为了使用它而修改生产者。也就是说,生产者只是把东西塞进队列。消费者最终如何使用它是无关紧要的。然后,使用 BlockingCollection
,您将拥有:
BlockingCollection<ItemType> inputQueue = new BlockingCollection<ItemType>();
BlockingCollection<List<ItemType>> intermediateQueue = new BlockingCollection<List<ItemType>>();
您的生产者通过调用 inputQueue.Add
将内容添加到输入队列。您的中间消费者(称之为整合者)通过调用 TryTake 从队列中获取东西。超时。例如:
List<ItemType> items = new List<ItemType>();
while (!inputQueue.IsCompleted)
{
ItemType t;
while (inputQueue.TryTake(out t, TimeSpan.FromSeconds(5))
{
items.Add(t);
}
if (items.Count > 0)
{
// Add this list of items to the intermediate queue
intermediateQueue.Add(items);
items = new List<ItemType>();
}
}
第二个消费者只是读取中间队列:
foreach (var itemsList in intermediateQueue.GetConsumingEnumerable))
{
// do something with the items list
}
不需要 ManualResetEvent
或 lock
或其中任何一个; BlockingCollection
为您处理所有困惑的并发问题。
关于c# - 延迟生产者消费者模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23410151/