在将 TPL 数据流移植到我的生产代码中之前,我正在试验它。 生产代码是一个经典的生产者/消费者系统——生产者产生消息(与金融领域相关),消费者处理这些消息。
我感兴趣的是,如果在某个时候生产者的生产速度比消费者可以处理的速度快得多(系统会崩溃,或者会发生什么)以及更重要的是该怎么做,环境将如何保持稳定在这些情况下。
因此,为了尝试拥有类似的简单应用程序,我提出了以下建议。
var bufferBlock = new BufferBlock<Item>();
var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
,
BoundedCapacity = 100000
};
var dataFlowLinkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
executiondataflowBlockOptions);
bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
{
bufferBlock.SendAsync(GenerateItem());
}
bufferBlock.Complete();
Console.ReadLine();
Item
是一个非常简单的类
internal class Item
{
public Item(string itemId)
{
ItemId = itemId;
}
public string ItemId { get; }
}
GenerateItem
简单的新闻Item
static Item GenerateItem()
{
return new Item(Guid.NewGuid().ToString());
}
现在,模仿没那么快的消费者-我做了ProcessItem
持有 100ms
.
static async Task ProcessItem(Item item)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
Console.WriteLine($"Processing #{item.ItemId} item.");
}
执行此操作会在 20 秒左右出现 OOM 异常。
然后我继续添加更多的消费者(最多 10 个更多的 ActionBlock),这会赢得更多时间,但最终会导致相同的 OOM 异常。
我也注意到GC承受着巨大的压力(VS 2015诊断工具显示GC几乎一直在运行),所以我为ConcurrentBag
引入了对象池(很简单,本质上就是Item
存储项目) ,但我仍然碰壁(抛出 OOM 异常)。
详细说明内存中有什么,为什么内存不足。
- 最大尺寸的对象类型为
SingleProducerSingleConsumerQueue+Segment<TplDataFlow.Item>
&ConcurrentQueue+Segment<TplDataFlow.Item>
- 我看到了
BufferBlock
的 InputBuffer 充满了Item
小号(计数=14,562,296) - 自从我设置了
BoundedCapacity
对于ActionBlock
(s),他们的输入缓冲区也接近于配置的数量(InputCount=99,996)
为了确保较慢的生产者能够让消费者跟上,我让生产者在迭代之间休眠:
for (int i = 0; i < int.MaxValue; i++)
{
Thread.Sleep(TimeSpan.FromMilliseconds(50));
bufferBlock.SendAsync(GenerateItem());
}
而且它工作正常 - 没有抛出异常,内存使用率一直很低,我再也看不到任何 GC 压力。
所以我有几个问题
- 在尝试使用 TPL 数据流构建 block 重现非常快的生产者/缓慢的消费者场景时,我是否做错了什么
- 有什么方法可以使这项工作正常进行而不因 OOM 异常而失败。
- 有关如何在 TPL 数据流上下文中处理此类场景(非常快的生产者/缓慢的消费者)的最佳实践的任何评论/链接。
- 我对这个问题的理解是 - 由于消费者跟不上,
BufferBlock
的内部缓冲区很快就会被消息填满,并且会延迟消息,直到一些消费者回来请求下一条消息,结果应用程序内存不足(由于BufferBlock
的内部缓冲区已满)-你同意吗?
我正在使用 Microsoft.Tpl.Dataflow
包-版本 4.5.24。
.NET 4.5(C#6)。进程是 32 位的。
最佳答案
您已经很好地确定了问题:BufferBlock
正在填充其输入缓冲区,直到它遇到 OOM。
要解决这个问题,您还应该向缓冲区 block 添加一个 BoundedCapacity
选项。这将自动为您限制生产者(生产者中不需要 Thread.Sleep
)。
关于c# - TPL 数据流 - 非常快的生产者,不是那么快的消费者 OutOfMemory 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40340274/