C#对许多相对较大的对象进行垃圾回收

标签 c# .net .net-core memory-management garbage-collection

我有几个进程轮询不同的数据源以获取某种特定类型的信息。他们经常轮询它并在后台进行,所以当我需要这些信息时,它随时可用,不需要会浪费时间的往返。
示例代码如下所示:

public class JournalBackgroundPoller
{
    private readonly int _clusterSize;

    private readonly IConfiguration _configuration;

    Dictionary<int, string> _journalAddresses;
    private readonly Random _localRandom;
    private readonly Task _runHolder;

    internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();

    public JournalBackgroundPoller(IConfiguration configuration)
    {
        _localRandom = new Random();

        _configuration = configuration;
        _clusterSize = 20;//for the sake of demo

        _journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};

        _runHolder = BuildAndRun();
    }

    private Task BuildAndRun()
    {
        var pollingTasks = new List<Task>();
        var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);

        PopulateShardsRegistry();

        foreach (var js in _journalAddresses)
        {
            var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
                new ExecutionDataflowBlockOptions
                { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });

            var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });

            buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);

            dataProcessor.LinkTo(dataStorer);
            dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());

            pollingTasks.Add(PollInfinitely(js, buffer));
        }

        var r = Task.WhenAll(pollingTasks);
        return r;
    }

    private void PopulateShardsRegistry()
    {
        try
        {
            for (int i = 0; i < _clusterSize; i++)
            {
                var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
            }
        }
        catch (Exception e)
        {
            Console.WriteLine("Could `t initialize shards registry");
        }
    }

    private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
    {
        while (true)
        {
            try
            {
                //here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
                var journalEntries = new List<JournalEntryResponseItem>(200000);

                buffer.Post(
                    new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = journalEntries });
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data");
                buffer.Post(
                    new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() });
            }

            await Task.Delay(_localRandom.Next(400, 601));
        }
    }

    private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input)
    {
        try
        {
            if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any())
            {
                return input;
            }

            foreach (var journalEntry in input.JournalEntryResponseItems)
            {
                //do some transformations here
            }

            return input;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
            return null;
        }
    }

    private void StoreValuesInBuffer(JournalResponsesWrapper input)
    {
        try
        {
            ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Could not write content to dictionary");
        }
    }
}
为简单起见,期刊相关实体将如下所示:
class JournalEntryResponseItem
{
    public string SomeProperty1 { get; set; }

    public string SomeProperty2 { get; set; }
}

class JournalResponsesWrapper
{
    public KeyValuePair<int, string> JournalDataSource { get; set; }

    public List<JournalEntryResponseItem> JournalEntryResponseItems { get; set; }
}
提供的代码的全局问题显然是我正在创建相对大量的对象,这些对象可能会在短时间内以 LOH 结束。数据源总是提供最新的条目,所以我不需要保留旧的条目(我也不能这样做,因为它们没有区别)。我的问题是是否可以优化内存使用、对象创建和替换往返,以便减少垃圾收集的频率?现在,垃圾收集每约 5-10 秒发生一次。
UPD 1:我通过 ResultsBuffer 访问数据并且可以在刷新之前多次读取相同的集合。不能保证一个特定的数据集只会被读取一次(或根本不会被读取)。我的大对象是List<JournalEntryResponseItem>实例,最初来自数据源,然后保存到 ResultsBuffer .
UPD 2:数据源只有一个端点,一次返回这个“分片”中的所有实体,我无法在请求期间应用过滤。响应实体没有唯一的键/标识符。
UPD 3:一些答案建议先衡量/分析应用程序。虽然在这种特殊情况下这是完全有效的建议,但由于以下观察结果,它显然与内存/GC 相关:
  • 视觉节流恰好发生在应用程序 RAM 消耗在稳定增长一段时间后急剧下降的那一刻。
  • 如果我再添加 X 个日志源,应用程序的内存将增长,直到它占用服务器上的所有可用内存,然后卡住时间更长(1-3 秒),之后内存急剧下降,应用程序继续工作,直到达到内存限制再次。
  • 最佳答案

    身后List<T>总有一个 T[]连续项目,将其标注为 200000 肯定会直接将其放入 LOH。为了避免这种情况,我建议使用简单的逻辑分区而不是物理维度和 Post分批列出。这样在每次轮询期间,巨大的列表将转到 LOH,但会在下一个 GC 第 2 代集合中收集(请确保没有更多引用)。 LOH 几乎为空,但是由于托管堆中发生的添加复制操作,GC Generation 2 收集将比以前更多。这是一个小的变化,我提供了新的 JournalBackgroundPoller类(class):

    public class JournalBackgroundPoller
    {
        private readonly int _clusterSize;
    
        private readonly IConfiguration _configuration;
    
        Dictionary<int, string> _journalAddresses;
        private readonly Random _localRandom;
        private readonly Task _runHolder;
    
        internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();
    
        public JournalBackgroundPoller(IConfiguration configuration)
        {
            _localRandom = new Random();
    
            _configuration = configuration;
            _clusterSize = 20;//for the sake of demo
    
            // _journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};
            _journalAddresses = new Dictionary<int, string>
            {
                { 1, "SOME ADDR1" },
                { 2, "SOME ADDR 2" }
            };
    
            _runHolder = BuildAndRun();
        }
    
        private Task BuildAndRun()
        {
            var pollingTasks = new List<Task>();
            var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);
    
            PopulateShardsRegistry();
    
            foreach (var js in _journalAddresses)
            {
                var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
                    new ExecutionDataflowBlockOptions
                    { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });
    
                var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });
    
                buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);
    
                dataProcessor.LinkTo(dataStorer);
                dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());
    
                pollingTasks.Add(PollInfinitely(js, buffer));
            }
    
            var r = Task.WhenAll(pollingTasks);
            return r;
        }
    
        private void PopulateShardsRegistry()
        {
            try
            {
                for (int i = 0; i < _clusterSize; i++)
                {
                    var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Could `t initialize shards registry");
            }
        }
    
        private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
        {
            while (true)
            {
                try
                {
                    //here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
                    var journalEntries = new List<JournalEntryResponseItem>(200000);
    
                    // NOTE:
                    // We need to avoid references to the huge list so GC collects it ASAP in the next
                    // generation 2 collection: after that, nothing else goes to the LOH.
                    const int PartitionSize = 1000;
                    for (var index = 0; index < journalEntries.Count; index += PartitionSize)
                    {
                        var journalEntryResponseItems = journalEntries.GetRange(index, PartitionSize);
                        buffer.Post(
                            new JournalResponsesWrapper
                            {
                                JournalDataSource = dataSourceInfo,
                                JournalEntryResponseItems = journalEntryResponseItems
                            });
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data");
                    buffer.Post(
                        new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() });
                }
    
                await Task.Delay(_localRandom.Next(400, 601));
            }
        }
    
        private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input)
        {
            try
            {
                if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any())
                {
                    return input;
                }
    
                foreach (var journalEntry in input.JournalEntryResponseItems)
                {
                    //do some transformations here
                }
    
                return input;
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
                return null;
            }
        }
    
        private void StoreValuesInBuffer(JournalResponsesWrapper input)
        {
            try
            {
                ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Could not write content to dictionary");
            }
        }
    }

    请看一下 30 秒后原始内存使用情况的快照
    Before the optimization
    这是30秒后优化内存使用的快照
    After the optimization
    注意区别
  • 稀疏数组 :JournalEntryResponseItem[]从浪费的 1,600,000 和长度 200,000 到没有。
  • LOH 用法 : 从 3.05 MB 到没有。
  • 关于C#对许多相对较大的对象进行垃圾回收,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64878714/

    相关文章:

    c# - 微软统一。如何在构造函数中指定某个参数?

    c# - org.xml.sax.SAXParseException : Character reference "&#x0" is an invalid XML character

    C# 正则表达式问题 "unrecognized escape sequence"

    c# - 如何等待 task.run

    c# - 用圆括号包围 lambda 表达式参数

    C#/.NET 如何从类名中找到包含的命名空间

    macos - 在 Mac OS X 上支持多个 .NET Core 版本

    c# - 正则表达式错误 : expression took longer than ms to execute

    javascript - 如何从 JavaScript 运行 Razor 函数

    msbuild - 将 NuGet 构建工具包与 ASP.NET Core 结合使用