我有几个进程轮询不同的数据源以获取某种特定类型的信息。他们经常轮询它并在后台进行,所以当我需要这些信息时,它随时可用,不需要会浪费时间的往返。
示例代码如下所示:
public class JournalBackgroundPoller
{
private readonly int _clusterSize;
private readonly IConfiguration _configuration;
Dictionary<int, string> _journalAddresses;
private readonly Random _localRandom;
private readonly Task _runHolder;
internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();
public JournalBackgroundPoller(IConfiguration configuration)
{
_localRandom = new Random();
_configuration = configuration;
_clusterSize = 20;//for the sake of demo
_journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};
_runHolder = BuildAndRun();
}
private Task BuildAndRun()
{
var pollingTasks = new List<Task>();
var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);
PopulateShardsRegistry();
foreach (var js in _journalAddresses)
{
var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });
var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });
buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);
dataProcessor.LinkTo(dataStorer);
dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());
pollingTasks.Add(PollInfinitely(js, buffer));
}
var r = Task.WhenAll(pollingTasks);
return r;
}
private void PopulateShardsRegistry()
{
try
{
for (int i = 0; i < _clusterSize; i++)
{
var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
}
}
catch (Exception e)
{
Console.WriteLine("Could `t initialize shards registry");
}
}
private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
{
while (true)
{
try
{
//here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
var journalEntries = new List<JournalEntryResponseItem>(200000);
buffer.Post(
new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = journalEntries });
}
catch (Exception ex)
{
Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data");
buffer.Post(
new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() });
}
await Task.Delay(_localRandom.Next(400, 601));
}
}
private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input)
{
try
{
if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any())
{
return input;
}
foreach (var journalEntry in input.JournalEntryResponseItems)
{
//do some transformations here
}
return input;
}
catch (Exception ex)
{
Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
return null;
}
}
private void StoreValuesInBuffer(JournalResponsesWrapper input)
{
try
{
ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
}
catch (Exception ex)
{
Console.WriteLine($"Could not write content to dictionary");
}
}
}
为简单起见,期刊相关实体将如下所示:class JournalEntryResponseItem
{
public string SomeProperty1 { get; set; }
public string SomeProperty2 { get; set; }
}
class JournalResponsesWrapper
{
public KeyValuePair<int, string> JournalDataSource { get; set; }
public List<JournalEntryResponseItem> JournalEntryResponseItems { get; set; }
}
提供的代码的全局问题显然是我正在创建相对大量的对象,这些对象可能会在短时间内以 LOH 结束。数据源总是提供最新的条目,所以我不需要保留旧的条目(我也不能这样做,因为它们没有区别)。我的问题是是否可以优化内存使用、对象创建和替换往返,以便减少垃圾收集的频率?现在,垃圾收集每约 5-10 秒发生一次。UPD 1:我通过
ResultsBuffer
访问数据并且可以在刷新之前多次读取相同的集合。不能保证一个特定的数据集只会被读取一次(或根本不会被读取)。我的大对象是List<JournalEntryResponseItem>
实例,最初来自数据源,然后保存到 ResultsBuffer
.UPD 2:数据源只有一个端点,一次返回这个“分片”中的所有实体,我无法在请求期间应用过滤。响应实体没有唯一的键/标识符。
UPD 3:一些答案建议先衡量/分析应用程序。虽然在这种特殊情况下这是完全有效的建议,但由于以下观察结果,它显然与内存/GC 相关:
最佳答案
身后List<T>
总有一个 T[]
连续项目,将其标注为 200000 肯定会直接将其放入 LOH。为了避免这种情况,我建议使用简单的逻辑分区而不是物理维度和 Post
分批列出。这样在每次轮询期间,巨大的列表将转到 LOH,但会在下一个 GC 第 2 代集合中收集(请确保没有更多引用)。 LOH 几乎为空,但是由于托管堆中发生的添加复制操作,GC Generation 2 收集将比以前更多。这是一个小的变化,我提供了新的 JournalBackgroundPoller
类(class):
public class JournalBackgroundPoller
{
private readonly int _clusterSize;
private readonly IConfiguration _configuration;
Dictionary<int, string> _journalAddresses;
private readonly Random _localRandom;
private readonly Task _runHolder;
internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();
public JournalBackgroundPoller(IConfiguration configuration)
{
_localRandom = new Random();
_configuration = configuration;
_clusterSize = 20;//for the sake of demo
// _journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};
_journalAddresses = new Dictionary<int, string>
{
{ 1, "SOME ADDR1" },
{ 2, "SOME ADDR 2" }
};
_runHolder = BuildAndRun();
}
private Task BuildAndRun()
{
var pollingTasks = new List<Task>();
var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);
PopulateShardsRegistry();
foreach (var js in _journalAddresses)
{
var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });
var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });
buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);
dataProcessor.LinkTo(dataStorer);
dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());
pollingTasks.Add(PollInfinitely(js, buffer));
}
var r = Task.WhenAll(pollingTasks);
return r;
}
private void PopulateShardsRegistry()
{
try
{
for (int i = 0; i < _clusterSize; i++)
{
var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
}
}
catch (Exception e)
{
Console.WriteLine("Could `t initialize shards registry");
}
}
private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
{
while (true)
{
try
{
//here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
var journalEntries = new List<JournalEntryResponseItem>(200000);
// NOTE:
// We need to avoid references to the huge list so GC collects it ASAP in the next
// generation 2 collection: after that, nothing else goes to the LOH.
const int PartitionSize = 1000;
for (var index = 0; index < journalEntries.Count; index += PartitionSize)
{
var journalEntryResponseItems = journalEntries.GetRange(index, PartitionSize);
buffer.Post(
new JournalResponsesWrapper
{
JournalDataSource = dataSourceInfo,
JournalEntryResponseItems = journalEntryResponseItems
});
}
}
catch (Exception ex)
{
Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data");
buffer.Post(
new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() });
}
await Task.Delay(_localRandom.Next(400, 601));
}
}
private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input)
{
try
{
if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any())
{
return input;
}
foreach (var journalEntry in input.JournalEntryResponseItems)
{
//do some transformations here
}
return input;
}
catch (Exception ex)
{
Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
return null;
}
}
private void StoreValuesInBuffer(JournalResponsesWrapper input)
{
try
{
ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
}
catch (Exception ex)
{
Console.WriteLine($"Could not write content to dictionary");
}
}
}
请看一下 30 秒后原始内存使用情况的快照
这是30秒后优化内存使用的快照
注意区别
JournalEntryResponseItem[]
从浪费的 1,600,000 和长度 200,000 到没有。 关于C#对许多相对较大的对象进行垃圾回收,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64878714/