c# - 如何修复仅限于从Azure ServiceBus读取1个客户端的问题

标签 c# azure queue azure-web-sites servicebus

我有一些东西对我来说是头疼的...

我有一个小型服务,该服务从Azure ServiceBus队列读取消息并将数据存储在CosmosDB集合中。

问题是我无法扩展我的服务。我已经能够优化事情,以提高该服务的一个实例每秒读取的消息数。但是,添加更多服务实例会稍微降低每秒每秒读取的消息总数!

重要的是要注意,将消息批量发送到队列的工作就像一种魅力,我每秒可以将1000-2000条消息发送到队列而没有任何问题。从队列中读取是个问题。

我的处理程序需要占用大量CPU,并且消息的大小范围从大约2 KB到900 KB,平均大约25 KB。我现在有一个实例可以每秒处理大约41.5条消息。

如果我添加服务的第二个实例(顺便说一句,它是一个Azure Web App),则所有实例每秒读取的消息总数下降到大约40。添加另一个实例将其减少到接近38。

从队列中读取消息(并处理重试,死信等)的实际代码是公司内部框架的一部分,许多其他服务都在使用该框架,而这些服务都没有这个问题。其他服务具有预期的行为,即性能随服务实例的数量线性增加(显然,达到ServiceBus可以处理的最大值)。

我在两个都使用Premium ServiceBus层的不同Azure订阅(TEST和PROD)上遇到相同的问题。

我不在队列上使用会话。

这里有没有人遇到过类似的问题,您是如何解决的?

我尝试过的事情:


仅切换与删除内容有关的代码
而是从blob存储读取代码的ServiceBus。这给了我
CosmosDB的吞吐量提高了几个数量级(大约
至今已有15,000份书面文件。第二个vs. 41.6。
ServiceBus)。通过横向扩展获得了如此高的吞吐量,这是我在使用ServiceBus时遇到的问题,因此CosmosDb绝对不是瓶颈。
我尝试删除并重新创建队列,以及调整代码中的各种内容-但从逻辑上看,在我看来,我在代码中所做的任何事情都只能影响单个服务实例的性能。
我也尝试在Azure中运行该服务的同时在我的计算机上本地运行该服务。从Splunk日志中,我可以看到,在代表性的一分钟内,该服务的Azure实例处理了1371条消息,而该服务的本地实例仅处理了23条消息。因此,正如我一直在说的那样,这里似乎存在某种僵局或正在发生的事情。进一步的分析表明,Azure中的实例平均花费247毫秒来处理一条消息,而本地实例平均花费66秒!如果锁过期或发生未处理的异常,则会将一条消息放回到队列中,并在10次失败的传递尝试后进行死信签名。因此,似乎大多数本地处理的消息都失败了,并被放回到队列中,然后最终由Azure实例处理(这是我的猜测)。


Web应用程序实例之间唯一共享的资源是ServiceBus和CosmosDb,如上所述,我已经排除了CosmosDb。但是,看到我的TEST和PROD订阅都遇到了相同的问题(我们的DEV订阅不允许横向扩展),并且我尝试以各种不同的方式重新创建队列几次,因此无法是队列本身,并且在同一ServiceBus实例上使用的其他队列均没有此问题。

如预期的那样,调整/优化代码仅对一个实例的性能产生影响。据我所知,排除了外部瓶颈的可能性。剩下的一件事是我们的内部框架,该框架处理从队列中实际读取的消息,还被以下事实排除了:该框架的完全相同的版本已在许多其他Web应用程序中使用,事实证明该扩展可以工作。

我觉得在这里很合适...

解决方案:忘记更新此问题,所以终于到了……我们最终设法留出时间完全专注于此问题,并且通过各种测试,我们得出结论,这是在SDK中使用ReadBatchAsync方法的组合并有大量邮件是导致此问题的原因。切换为使用OnMessageAsync修复了它。

最佳答案

进行async void操作通常不是一个好主意。

此外,您还可以重构要分批调用的处理。

第一种方法假定无法使StartProcessMessage异步

void StartProcessMessage(Message m) {
    //...
}

public async Task Start() {
    while (true) {
        var messages = (await _queueClient.ReceiveBatchAsync(Math.Max(1, _configuration.MaxConcurrentCalls - _messagesInProgress))).ToArray();
        Interlocked.Add(ref _messagesInProgress, messages.Length);
        var tasks = messages.Select(m => Task.Run(() => StartProcessMessage(m)));
        await Task.WhenAll(tasks); //process in parallel.
        while (_messagesInProgress > _configuration.MaxConcurrentCalls) {
            await Task.Delay(100);
        }
    }
}


第二种方法假定StartProcessMessage可以重构为异步

Task StartProcessMessage(Message m) {
    //...
}

public async Task Start() {
    while (true) {
        var messages = (await _queueClient.ReceiveBatchAsync(Math.Max(1, _configuration.MaxConcurrentCalls - _messagesInProgress))).ToArray();
        Interlocked.Add(ref _messagesInProgress, messages.Length);
        var tasks = messages.Select(m => StartProcessMessage(m));
        await Task.WhenAll(tasks); //process in parallel.
        while (_messagesInProgress > _configuration.MaxConcurrentCalls) {
            await Task.Delay(100);
        }
    }
}

关于c# - 如何修复仅限于从Azure ServiceBus读取1个客户端的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51250329/

相关文章:

azure - 使用 Windows 身份验证将 Azure 应用服务连接到 Azure SQL 数据库

azure - 在 Azure 应用服务中添加自定义域时如何跳过验证

java - 如何在java中将队列的大小加倍

database - 有没有办法让 Celery/RabbitMQ 持久化?

c# - Lambda 如何按列表升序对元素重新排序并将空值放在后面?

c# - 在 async/await 中使用 ThreadStatic 变量

c# - 循环创建 MS Word 内容控件

c# - 如何将 WPF ScrollViewer 的所有内容保存为图像

c# - 返回目录 azure 文件存储中的 cloudfiles 字符串数组 C#

php - 为什么 Beanstalkd 队列中的作业失败 - Laravel 4.2