我正在开发一项服务,该服务负责记录发送到我们服务的请求。该服务正在离线工作(正在被解雇并忘记)。
我们根据一些输入参数(产品 ID)将请求保存到不同的数据库。我们不想每次有人发出请求时都保存到数据库中 - 我们更愿意构建一些要插入的“批处理”并执行 InsertMany
每 N
数量时间(比方说 10 秒)。我已经开始实现它,现在我正在为两件事而苦苦挣扎:
- 我需要使用
ConcurrentDictionary
吗?看来我会用普通词典达到同样的效果 - 如果以上问题的答案是“否,在您的情况下
ConcurrentDictionary
没有任何好处”- 是否有办法重写我的代码以“正确”使用ConcurrentDictionary
这样我就可以避免使用锁并确保AddOrUpdate
不会与清除批处理发生“冲突”?
让我粘贴片段并进一步解释:
// dictionary where key is ProductId and value is a list of items to insert to that product database
ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new ConcurrentDictionary<string, List<QuoteDetails>>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
_productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
{
list.Add(details);
return list;
});
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
lock (_productDetails)
{
foreach (var item in _productDetails)
{
// copy curent items and start a task which will process them
SaveProductRequests(item.Key, item.Value.ToList());
// clear curent items
item.Value.Clear();
}
}
}
public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
我主要担心的是没有锁会发生以下情况:
SaveRequestsToDatabase
已触发 - 并开始处理数据- 就在
SaveRequestsToDatabase
函数中调用item.Value.Clear();
之前,外部服务触发了另一个SaveRecentRequest
函数,该函数执行AddOrUpdate
使用相同的键 - 这将向集合添加请求 SaveRequestsToDatabase
正在完成并因此清除集合 - 但最初由 2 添加的对象不在集合中,因此未被处理
最佳答案
通常,并发问题源于没有首先选择正确的数据结构。
在您的情况下,您有两个工作流程:
- n 个生产者,并发且连续地对事件进行排队
- 1 个消费者,在给定时间出列和处理事件
您的问题是您正试图立即对事件进行分类,即使这不是必需的。在并发部分将事件保持为简单流,并仅在消费者部分对它们进行排序,因为那里没有并发。
ConcurrentQueue<(string token, QuoteDetails details)> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new ConcurrentQueue<(string, QuoteDetails)>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
_productDetails.Enqueue((token, details));
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
var products = new List<(string token, QuoteDetails details)>();
while (_productDetails.TryDequeue(out var item))
{
products.Add(item);
}
foreach (var group in products.GroupBy(i => i.token, i => i.Details))
{
SaveProductRequests(group.Key, group);
}
}
public async Task SaveProductRequests(string productId, IEnumerable<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
关于C# Concurent 字典 - 锁定值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45102034/