我有一个项目,它有一个 Sql-Server 数据库后端和 Dapper 作为 ORM。我正在尝试使用 Dapper 的 QueryAsync()
方法来获取一些数据。不仅如此,对我的 repo 的调用来自几个使用 Task.WhenAll
调用的任务(也就是说,每个任务都涉及从该 repo 获取数据,因此每个任务都在等待我的 repo 方法包装了 QueryAsync()
调用)。
问题是即使我正在使用 using
block ,我的 SqlConnections 也永远不会关闭。结果,我有 100 多个打开的数据库连接,并最终开始出现“达到最大池大小”异常。问题是,当我只切换到 Query()
而不是 QueryAsync()
时,它工作正常,但我希望能够异步执行此操作。
这是一个代码示例。我尽量模仿实际应用程序的结构,这就是为什么它看起来比实际情况更复杂的原因。
接口(interface):
public interface IFooRepository<T> where T: FooBase
{
Task<IEnumerable<T>> Select(string account, DateTime? effectiveDate = null);
}
实现:
public class FooRepository : RepositoryBase, IFooRepository<SpecialFoo>
{
private readonly IWebApiClientRepository _accountRepository;
public FooRepository(IWebApiClientRepository repo)
{
_accountRepository = repo;
}
public async Task<IEnumerable<FuturePosition>> Select(string code, DateTime? effectiveDate = null)
{
effectiveDate = effectiveDate ?? DateTime.Today.Date;
var referenceData = await _accountRepository.GetCrossRefferenceData(code, effectiveDate.Value);
using (var connection = new SqlConnection("iamaconnectionstring")
{
connection.Open();
try
{
var res = await connection.QueryAsync<FuturePosition>(SqlQueryVariable + "AND t.code = @code;",
new
{
effectiveDate = effectiveDate.Value,
code = referenceData.Code
});
foreach (var item in res)
{
item.PropFromReference = referenceData.PropFromReference;
}
return res;
}
catch (Exception e)
{
//log
throw;
}
finally
{
connection.Close();
}
}
}
}
现在有了调用代码,有 2 层。我将从外部开始。我认为这就是问题所在。下方有评论。
人口:
public class Populator : PopulatorBase
{
private IAccountRepository _acctRepository;
public override async Task<IEnumerable<PopulationResult>> ProcessAccount(DateTime? popDate = null)
{
//My attempt at throttling the async calls
//I was hoping this would force a max of 10 simultaneous connections.
//It did not work.
SemaphoreSlim ss = new SemaphoreSlim(10,10);
var accountsToProcess = _acctRepository.GetAllAccountsToProcess();
var accountNumbers = accountsToProcess.SelectMany(a => a.accountNumbers).ToList();
List<Task<ProcessResult>> trackedTasks = new List<Task<ProcessResult>>();
foreach (var item in accountNumbers)
{
await ss.WaitAsync();
trackedTasks.Add(ProcessAccount(item.AccountCode, popDate ?? DateTime.Today));
ss.Release();
}
//my gut tells me the issue is because of these tasks
var results = await Task.WhenAll(trackedTasks);
return results;
}
private async Task<ProcessResult>ProcessAccount(string accountCode, DateTime? popDate)
{
var createdItems = await _itemCreator.MakeExceptions(popDate, accountCode);
return Populate(accountCode, createdItems);
}
}
元素创造者:
public class ItemCreator : ItemCreatorBase
{
private readonly IFooRepository<FuturePosition> _fooRepository;
private readonly IBarRepository<FuturePosition> _barRepository;
public RussellGlobeOpFutureExceptionCreator() )
{
//standard constructor stuff
}
public async Task<ItemCreationResult> MakeItems(DateTime? effectiveDate, string account)
{
DateTime reconDate = effectiveDate ?? DateTime.Today.Date;
//this uses the repository I outlined above
var foos = await _fooRepository.Select(account, effectiveDate);
//this repository uses a rest client, I doubt it's the problem
var bars = await _barRepository.Select(account, effectiveDate);
//just trying to make this example less lengthy
var foobars = MakeFoobars(foos, bars);
var result = new ItemCreationResult { EffectiveDate = effectiveDate, Items = foobars };
return result;
}
}
就我所尝试的而言:
- 使用 SemaphoreSlim 节流
- 不节流
- 在 repo 中使用
connection.OpenAnync()
- 包括/排除 finally block (应该与
using
无关)
值得注意的是,填充器中的 foreach
循环运行了大约 500 次。本质上,有一个包含 500 个帐户的列表。对于每一个,它都需要执行一项长时间运行的 populate
任务,该任务涉及从我的 Foo 存储库中提取数据。
老实说,我不知道。我认为这可能与等待来自填充器任务列表中每个任务的异步数据库调用有关。对此问题的任何见解都会非常有帮助。
最佳答案
经过一些挖掘,我想我设法找出了问题所在。我认为我实际上并没有像我最初假设的那样遇到连接泄漏。据我现在的理解,通过连接池,当 SQL 连接从代码中关闭时,它实际上并没有消失——它只是作为空闲连接进入连接池。在 SQL 中查看打开的连接仍然会显示它。
因为我的数据访问是异步的,所以所有的连接都是在任何“关闭的”连接返回到池之前打开的,这意味着每个请求都会打开一个新连接。这导致了我看到的惊人数量的打开连接,让我假设我有连接泄漏。
使用 SemaphoreSlim 实际上解决了这个问题——我只是错误地实现了它。它应该像这样工作:
public override async Task<IEnumerable<ProcessResult>> ProcessAccount(DateTime? popDate = null)
{
foreach (item in accountNumbers)
{
trackedTasks.Add(new Func<Task<ProcessResult>>(async () =>
{
await ss.WaitAsync().ConfigureAwait(false);
try
{
return await ProcessAccount(item.AccountCode, popDate ?? DateTime.Today).ConfigureAwait(false);
}
catch (Exception e)
{
//log, etc.
}
finally
{
ss.Release();
}
})());
}
}
这样做会限制一次打开的连接数量,并等待它们关闭,因此池中相同的较小连接组将被重新使用。
关于c# - 使用异步时未处理 SqlConnection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48313087/