multithreading - 尝试将 AddMessage 批量添加到 Azure 队列

标签 multithreading azure queue batch-processing

我有大约 50K 条消息想要添加到 Azure 队列中。

我不确定我的代码是否安全。感觉/闻起来很糟糕。

基本上,给出 POCO 的集合,将 POCO 序列化为某个 json,然后将该 json 文本添加到队列中。

public void AddMessage(T content)
{
    content.ShouldNotBe(null);

    var json = JsonConvert.SerializeObject(content);
    var message = new CloudQueueMessage(json);
    Queue.AddMessage(message);
}

public void AddMessages(ICollection<T> contents)
{
    contents.ShouldNotBe(null);
    Parallel.ForEach(contents, AddMessage);
}

有人可以告诉我应该做什么来解决这个问题 - 最重要的是,为什么?

我觉得在这种情况下队列可能不是线程安全的。

最佳答案

我观察到的关于 Parallel.ForEach 和处理 Azure 存储的一些事情(我的经验是并行上传 blob/ block ):

  • Azure 存储操作是基于网络 (IO) 的操作,而不是处理器密集型操作。如果我没记错的话,Parallel.ForEach 更适合处理器密集型应用程序。
  • 使用 Parallel.ForEach 上传大量 blob(或 block )时我们注意到的另一件事是,我们开始出现大量 Timeout 异常,并且速度实际上减慢了整个操作下来。我相信这样做的原因是,当您使用这种方法迭代包含大量项目的集合时,您实际上是在处理对底层框架的控制,该框架决定如何处理该集合。在这种情况下,将发生大量上下文切换,这会减慢操作速度。考虑到有效负载较小,不确定这在您的场景中如何工作。

我的建议是让应用程序控制它可以生成的并行线程的数量。一个好的标准是逻辑处理器的数量。另一个好的标准是 IE 可以打开的端口数量。所以你会产生那么多数量的并行线程。然后,您可以等待所有线程完成以生成下一组并行线程,或者在一个任务完成后立即启动一个新线程。

伪代码:

    ICollection<string> messageContents;
    private void AddMessages()
    {
        int maxParallelThreads = Math.Min(Environment.ProcessorCount, messageContents.Count);
        if (maxParallelThreads > 0)
        {
            var itemsToAdd = messageContents.Take(maxParallelThreads);
            List<Task> tasks = new List<Task>();
            for (var i = 0; i < maxParallelThreads; i++)
            {
                tasks.Add(Task.Factory.StartNew(() =>
                {
                    AddMessage(itemsToAdd[i]);
                    RemoveItemFromCollection();
                }));
            }
            Task.WaitAll(tasks.ToArray());
            AddMessages();
        }
    }

关于multithreading - 尝试将 AddMessage 批量添加到 Azure 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22903312/

相关文章:

c# - 如何唤醒休眠线程?

java - 同步对类字段的访问

azure - 识别 Azure 服务总线队列的使用者

c - C 中的信号排队

api - Flutter 队列 API 请求稍后执行

java - JSVC Java Daemon 需要改进性能

c++ - 是否可以编写可以在 HPX 和 C++1x 线程之间切换的代码?

azure - 一个阶段的输出不可用于后续阶段

azure - 如何登录 Azure 服务主体

c - Lipipq(iptables)。如何使用 iptables 队列将捕获的数据包重定向到另一个地址?