c# - 如何使用 BlockingCollection<T> 阻塞所有生产者直到所有项目被消费(批量消费)?

标签 c# producer-consumer blockingcollection

我阅读了一些其他类似但不同的链接,试图找到一些答案: How to consume a BlockingCollection<T> in batches

但是,(在上面的链接中)不使用 GetConsumingEnumerable 似乎有问题。

当消费者(应该是单数)清空集合时有效阻塞生产者的正确方法是什么?

[我们想要进行批处理,因为每个批处理都会执行一次 Web 服务调用,如果每条消息/项目都需要自己的调用,这将是一个瓶颈。批处理消息/项目是这个瓶颈的解决方案。]

理想情况下:

1)接收消息

2) 新的生产者任务插入集合

3) 当集合“满”(任意限制)时,阻止所有生产者,新的消费者任务消耗所有集合,然后为生产者解锁。

换句话说;我希望(并行生产者)xor(单个消费者)随时对集合进行操作。

似乎以前就应该这样做,但我似乎找不到专门这样做的代码片段。

感谢您的帮助。

最佳答案

使用这个模型,所有的工作都是完全序列化的,也就是说你永远不会同时有一个以上的“东西”在工作。要么生产者在工作,要么消费者在工作。因此,您实际上并不需要由生产者和消费者共同操作的集合,相反,您可以让生产者批量生产消费者在完成后消费的传统集合。它可能看起来像这样:

public Task<List<Thing>> Produce(Message message)
{
    //...
}

public Task Consume(List<Thing> data)
{
    //...
}

public async Task MessageReceived(Message message)
{
    while(HaveMoreBatches(message))
    {
        await Consume(await Produce(message));
    }
}

这让您可以生产一个批处理,然后使用它,然后生产另一个批处理,然后使用它,等等,直到没有更多的批处理可以生产。

关于c# - 如何使用 BlockingCollection<T> 阻塞所有生产者直到所有项目被消费(批量消费)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29478852/

相关文章:

c# - 使用具有单个 DbContext 和 Entites 的多个数据库并在运行时生成 Conn String

c# - 如果我已经创建了一个序列化程序集,为什么代码会编译一个序列化程序集?

java - 了解同步的使用

c# - BlockingCollection 在 10 秒内不会重试

c# - BlockingCollection 多个消费者

c# - 命名空间和使用范围解析

c# - WinForms 中的 header 控件 - 有这样的东西吗?

java - 具有批量和刷新功能的生产者/消费者

c# - 多生产者,单一消费者

java - 为什么PriorityBlockingQueue队列没有​​按照优先级对元素进行排序