完整的可重现代码是 on github ,启动可执行文件后,内存很快就会飙升。代码主要位于 AsyncBlockingQueue.cs
类中。
以下代码实现了一个简单的异步“阻塞”队列:
public async Task<T> DequeueAsync(
int timeoutInMs = -1,
CancellationToken cancellationToken = default)
{
try
{
using (CancellationTokenSource cts = this.GetCancellationTokenSource(timeoutInMs, cancellationToken))
{
T value = await this._channel.Reader.ReadAsync(cts?.Token ?? cancellationToken).ConfigureAwait(false);
return value;
}
}
catch (ChannelClosedException cce)
{
await Console.Error.WriteLineAsync("Channel is closed.");
throw new ObjectDisposedException("Queue is disposed");
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
await Console.Error.WriteLineAsync("Dequeue failed.");
throw;
}
}
private CancellationTokenSource GetCancellationTokenSource(
int timeoutInMs,
CancellationToken cancellationToken)
{
if (timeoutInMs <= 0)
{
return null;
}
CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutInMs));
return cts;
}
这样使用时,会出现内存泄漏:
try
{
string message = await this._inputQueue.DequeueAsync(10,cancellationToken).ConfigureAwait(false);
}
catch(OperationCanceledException){
// timeout
}
最佳答案
我能够重现您所观察到的问题。这显然是 Channels 中的一个缺陷图书馆恕我直言。这是我的重现:
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public static class Program
{
public static async Task Main()
{
var channel = Channel.CreateUnbounded<int>();
var bufferBlock = new BufferBlock<int>();
var asyncCollection = new Nito.AsyncEx.AsyncCollection<int>();
var mem0 = GC.GetTotalMemory(true);
int timeouts = 0;
for (int i = 0; i < 10; i++)
{
var stopwatch = Stopwatch.StartNew();
while (stopwatch.ElapsedMilliseconds < 500)
{
using var cts = new CancellationTokenSource(1);
try
{
await channel.Reader.ReadAsync(cts.Token);
//await bufferBlock.ReceiveAsync(cts.Token);
//await asyncCollection.TakeAsync(cts.Token);
}
catch (OperationCanceledException) { timeouts++; }
}
var mem1 = GC.GetTotalMemory(true);
Console.WriteLine($"{i + 1,2}) Timeouts: {timeouts,5:#,0},"
+ $" Allocated: {mem1 - mem0:#,0} bytes");
}
}
}
输出:
1) Timeouts: 124, Allocated: 175,664 bytes
2) Timeouts: 250, Allocated: 269,720 bytes
3) Timeouts: 376, Allocated: 362,544 bytes
4) Timeouts: 502, Allocated: 453,264 bytes
5) Timeouts: 628, Allocated: 548,080 bytes
6) Timeouts: 754, Allocated: 638,800 bytes
7) Timeouts: 880, Allocated: 729,584 bytes
8) Timeouts: 1,006, Allocated: 820,304 bytes
9) Timeouts: 1,132, Allocated: 919,216 bytes
10) Timeouts: 1,258, Allocated: 1,011,928 bytes
每个操作会泄漏大约 800 个字节,这非常令人讨厌。每次在 channel 中写入新值时都会回收内存,因此对于繁忙的 channel 来说,这种设计缺陷不应该成为问题。但对于偶尔接收值的 channel 来说,这可能会成为一个阻碍。
还有其他可用的异步队列实现,它们不会遇到相同的问题。您可以尝试评论await channel.Reader.ReadAsync(cts.Token);
行并取消注释下面两行中的任何一行。您将看到 BufferBlock<T>
来自TPL Dataflow图书馆,以及 AsyncCollection<T>
来自Nito.AsyncEx.Coordination包,允许从队列中异步检索超时,没有内存泄漏。
关于c# - 具有 CancellationTokenSource 的 channel 在处理后出现超时内存泄漏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67573683/