c# - 使用 ServiceBus SubscriptionClient 时出现 RenewLock 错误

标签 c# azure .net-core azureservicebus servicebus

我正在 .net Core 2.1 中编写一个控制台应用程序,我的目的是监听 ServiceBus 中某个主题的消息,并使用 NEST api 处理到达 Elasticsearch 的新消息(NEST 可能与我的问题无关,但我想保持透明)。

我在 ServiceBus 中的主题实体称为“test”,我有一个订阅也称为“test”(完整路径为“test/subscriptions/test”)。

在我的 .net Core 控制台应用程序中,我有以下 NuGet 引用:

<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.2.1" />
<PackageReference Include="NEST" Version="6.4.1" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />

在使用 .net Standard ServiceBus Api 时,我遇到一个非常奇怪的问题,经常出现更新锁定错误:

Message handler encountered an exception Microsoft.Azure.ServiceBus.MessageLockLostException

我已将代码剥离回一个非常可重现的示例:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Microsoft.Azure.ServiceBus;
using Nest;
using Newtonsoft.Json;

namespace SampleApp
{
    public class Program
    {

    private static SubscriptionClient _subscriptionClient;
    private static IElasticClient _elasticClient;

    private static string ServiceBusConnectionString = "[connectionString]";
    private static string TopicName = "test";
    private static string SubscriptionName = "test";

    public static void Main(string[] args)
    {
        var elasticsearchSettings = new ConnectionSettings(new SingleNodeConnectionPool(new Uri("http://does.not.exist:9200"))).DefaultIndex("DoesNotExistIndex");
        _elasticClient = new ElasticClient(elasticsearchSettings);

        _subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);

        // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
            // Set it according to how many messages the application wants to process in parallel.
            MaxConcurrentCalls = 1,
            MaxAutoRenewDuration = TimeSpan.FromSeconds(400),
            // Indicates whether the message pump should automatically complete the messages after returning from user callback.
            // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
            AutoComplete = false
        };

        // Register the function that processes messages.
        _subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);

        Console.WriteLine("INFO: Process message handler registered, listening for messages");
        Console.Read();
    }

    private static async Task ProcessMessagesAsync(Message message, CancellationToken token)
    {
        // Message received.
        var content = Encoding.UTF8.GetString(message.Body);

        var messageBody = JsonConvert.DeserializeObject<string[]>(content);

        Console.WriteLine($"INFO: Message arrived: {message}");
        Console.WriteLine($"INFO: Message body: \"{string.Join(",", messageBody)}\"");
        try
        {
            var response = _elasticClient.Ping();

            if (!response.IsValid && response.OriginalException != null)
                Console.WriteLine($"ERROR: ElasticSearch could not be reached, error was \"{response.OriginalException.Message}\"");
            else
                Console.WriteLine("INFO: ElasticSearch was contacted successfully");
        }
        catch (Exception e)
        {
            Console.WriteLine("!ERROR!: " + e);
        }

        await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
        Console.WriteLine("INFO: Message completed");
    }

    // Use this handler to examine the exceptions received on the message pump.
    private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}: " +
                          $"{exceptionReceivedEventArgs.ExceptionReceivedContext.Action}: " +
                          $"{exceptionReceivedEventArgs.ExceptionReceivedContext.EntityPath}");
        return Task.CompletedTask;
    }

}

此代码与此处的示例几乎相同: https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions

我故意“ping”一个不存在的 Elasticsearch 实例,以产生套接字异常,以帮助我重现问题。

我注意到的一件事是,当我创建一个新主题并设置 EnabledPartioning = false 时,问题不会发生。

有人见过这个吗?似乎是 ServiceBus 代码本身的深层问题。

注意:我尝试使用接收器使用“ReceiveAsync”读取消息,在这种情况下我也收到此错误。另外,我的测试驱动程序是将 .net Framework ServiceBus 客户端(它与分区一起使用)移至 .net Core 版本。

提前感谢您的指点!!

最佳答案

在我上面的例子中,问题是由于对我的配置的轻微误解造成的。 在 Azure 中,如果您导航到:

资源组 > ServiceBusInstance > 主题 > testTopic > testSubscription

您可以找到订阅属性。在这里您将看到发送消息时锁定的持续时间。默认为 60 秒,但我将长时间运行的过程延长到最长 5 分钟,如下所示:

enter image description here

然后在代码中,当连接我的订阅客户端的属性时,我需要确保 MaxAutoRenewDuration属性设置正确。

我假设这个属性意味着,如果你为此定义了 30 秒,那么在内部,订阅客户端将每 30 秒更新一次锁定,因此,如果你的最大到期时间是 5 分钟,那么锁定将更新只要当您处理消息时...

事实上,该属性的实际含义是订阅客户端内部发生锁定更新的最大时间。

因此,如果您将其设置为 24 小时,例如Timespan.FromHours(24)如果您的处理需要 12 小时,则会更新。但是,如果您使用 Timespan.FromHours(12) 将其设置为 12 小时你的代码运行了 24 秒,当你去完成消息时,它会给出一个 lockLost 异常(因为我在较短的时间间隔内得到了上面的结果!)。

我做过的一件事很容易实现,就是动态拉动 LockDuration从运行时的订阅属性(我的所有主题可能有不同的配置)并应用 MaxAutoRenewDuration适本地使用这个。

代码示例:

sbNamespace.Topics.GetByName(“test”).Subscriptions.GetByName(“test”).LockDurationInSeconds

注意 - 我正在使用 Azure.Management.Fluent 包来构建 sbNamespace。

希望对其他人有帮助!

关于c# - 使用 ServiceBus SubscriptionClient 时出现 RenewLock 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53853306/

相关文章:

c# - “包含”方法返回 false

c# - 图像源和缓存

C# Parallel.For 不执行每一步

azure - powerBI azure 客户端应用程序对其他应用程序的报告嵌入权限

azure - 在应用服务前面配置WAF应用程序网关

c# - asp net core app出现MSB3277怎么办

docker - 通过 docker 容器从控制台应用程序连接到 redis

c# - 如何从编码的 UI 测试创建可执行文件(最好是 .exe 而不是命令行)?

c# - 使用Elasticsearch 7.1的NEST 7.0 alpha 2中的SortField

azure - 从 ARM 模板部署事件网格订阅时出现 "Resource cannot be updated during provisioning"错误