我有大约 50K 条消息想要添加到 Azure 队列中。
我不确定我的代码是否安全。感觉/闻起来很糟糕。
基本上,给出 POCO 的集合,将 POCO 序列化为某个 json,然后将该 json 文本添加到队列中。
public void AddMessage(T content)
{
content.ShouldNotBe(null);
var json = JsonConvert.SerializeObject(content);
var message = new CloudQueueMessage(json);
Queue.AddMessage(message);
}
public void AddMessages(ICollection<T> contents)
{
contents.ShouldNotBe(null);
Parallel.ForEach(contents, AddMessage);
}
有人可以告诉我应该做什么来解决这个问题 - 最重要的是,为什么?
我觉得在这种情况下队列可能不是线程安全的。
最佳答案
我观察到的关于 Parallel.ForEach 和处理 Azure 存储的一些事情(我的经验是并行上传 blob/ block ):
- Azure 存储操作是基于网络 (IO) 的操作,而不是处理器密集型操作。如果我没记错的话,
Parallel.ForEach
更适合处理器密集型应用程序。 - 使用
Parallel.ForEach
上传大量 blob(或 block )时我们注意到的另一件事是,我们开始出现大量Timeout
异常,并且速度实际上减慢了整个操作下来。我相信这样做的原因是,当您使用这种方法迭代包含大量项目的集合时,您实际上是在处理对底层框架的控制,该框架决定如何处理该集合。在这种情况下,将发生大量上下文切换
,这会减慢操作速度。考虑到有效负载较小,不确定这在您的场景中如何工作。
我的建议是让应用程序控制它可以生成的并行线程的数量。一个好的标准是逻辑处理器的数量。另一个好的标准是 IE 可以打开的端口数量。所以你会产生那么多数量的并行线程。然后,您可以等待所有线程完成以生成下一组并行线程,或者在一个任务完成后立即启动一个新线程。
伪代码:
ICollection<string> messageContents;
private void AddMessages()
{
int maxParallelThreads = Math.Min(Environment.ProcessorCount, messageContents.Count);
if (maxParallelThreads > 0)
{
var itemsToAdd = messageContents.Take(maxParallelThreads);
List<Task> tasks = new List<Task>();
for (var i = 0; i < maxParallelThreads; i++)
{
tasks.Add(Task.Factory.StartNew(() =>
{
AddMessage(itemsToAdd[i]);
RemoveItemFromCollection();
}));
}
Task.WaitAll(tasks.ToArray());
AddMessages();
}
}
关于multithreading - 尝试将 AddMessage 批量添加到 Azure 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22903312/