c# - TPL 数据流 - 非常快的生产者,不是那么快的消费者 OutOfMemory 异常

标签 c# .net producer-consumer tpl-dataflow

在将 TPL 数据流移植到我的生产代码中之前,我正在试验它。 生产代码是一个经典的生产者/消费者系统——生产者产生消息(与金融领域相关),消费者处理这些消息。

我感兴趣的是,如果在某个时候生产者的生产速度比消费者可以处理的速度快得多(系统会崩溃,或者会发生什么)以及更重要的是该怎么做,环境将如何保持稳定在这些情况下。

因此,为了尝试拥有类似的简单应用程序,我提出了以下建议。

var bufferBlock = new BufferBlock<Item>();

var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
                        ,
    BoundedCapacity = 100000
};

var dataFlowLinkOptions = new DataflowLinkOptions
{
    PropagateCompletion = true
};

var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
    executiondataflowBlockOptions);

bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
{
    bufferBlock.SendAsync(GenerateItem());
}

bufferBlock.Complete();
Console.ReadLine();

Item是一个非常简单的类

internal class Item
{
    public Item(string itemId)
    {
        ItemId = itemId;
    }

    public string ItemId { get; }
}

GenerateItem简单的新闻Item

static Item GenerateItem()
{
   return new Item(Guid.NewGuid().ToString());
}

现在,模仿没那么快的消费者-我做了ProcessItem持有 100ms .

static async Task ProcessItem(Item item)
{
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    Console.WriteLine($"Processing #{item.ItemId} item.");
}

执行此操作会在 20 秒左右出现 OOM 异常。

然后我继续添加更多的消费者(最多 10 个更多的 ActionBlock),这会赢得更多时间,但最终会导致相同的 OOM 异常。

我也注意到GC承受着巨大的压力(VS 2015诊断工具显示GC几乎一直在运行),所以我为ConcurrentBag引入了对象池(很简单,本质上就是Item存储项目) ,但我仍然碰壁(抛出 OOM 异常)。

详细说明内存中有什么,为什么内存不足。

  • 最大尺寸的对象类型为 SingleProducerSingleConsumerQueue+Segment<TplDataFlow.Item> & ConcurrentQueue+Segment<TplDataFlow.Item>
  • 我看到了 BufferBlock的 InputBuffer 充满了 Item小号(计数=14,562,296)
  • 自从我设置了BoundedCapacity对于 ActionBlock (s),他们的输入缓冲区也接近于配置的数量(InputCount=99,996)

为了确保较慢的生产者能够让消费者跟上,我让生产者在迭代之间休眠:

for (int i = 0; i < int.MaxValue; i++)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(50));
    bufferBlock.SendAsync(GenerateItem());
}

而且它工作正常 - 没有抛出异常,内存使用率一直很低,我再也看不到任何 GC 压力。

所以我有几个问题

  1. 在尝试使用 TPL 数据流构建 block 重现非常快的生产者/缓慢的消费者场景时,我是否做错了什么
  2. 有什么方法可以使这项工作正常进行而不因 OOM 异常而失败。
  3. 有关如何在 TPL 数据流上下文中处理此类场景(非常快的生产者/缓慢的消费者)的最佳实践的任何评论/链接。
  4. 我对这个问题的理解是 - 由于消费者跟不上,BufferBlock的内部缓冲区很快就会被消息填满,并且会延迟消息,直到一些消费者回来请求下一条消息,结果应用程序内存不足(由于 BufferBlock 的内部缓冲区已满)-你同意吗?

我正在使用 Microsoft.Tpl.Dataflow包-版本 4.5.24。 .NET 4.5(C#6)。进程是 32 位的。

最佳答案

您已经很好地确定了问题:BufferBlock 正在填充其输入缓冲区,直到它遇到 OOM。

要解决这个问题,您还应该向缓冲区 block 添加一个 BoundedCapacity 选项。这将自动为您限制生产者(生产者中不需要 Thread.Sleep)。

关于c# - TPL 数据流 - 非常快的生产者,不是那么快的消费者 OutOfMemory 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40340274/

相关文章:

mysql - 生产者消费者设置: How to handle Database Connections?

java - 客户端确认收到生产者消息

c# - .net mvc 路由贪婪正则表达式

c# - 为什么我无法使用 SqlCommand 创建触发器?

.net - 在 .NET 中进行保持事件套接字检查的最佳方法是什么?

c# - .NET Compact Framework 连接字符串加密/保护

c# - 为什么 WinHttpHandler 在生产服务器上导致 "Unable to load DLL ' api-ms-win-core-localization-l1-2-0.dll'”?

c# - 您如何解析图像标签的 HTML 字符串以获取 SRC 信息?

.net - 还有空间可以放置更好的 Visual Studio QuickWatch 窗口。有没有办法自己制作?

javascript - 如何为kafka主题创建消费者?