c# - 使用 Azure 服务总线队列监听器定期收到 RenewToken 上的未经授权的访问错误

标签 c# azure asp.net-core .net-core azureservicebus

我的队列接收器定期收到 Microsoft.Azure.ServiceBus.ServiceBusException(下面的消息已删除敏感信息)。 SAS 键具有发送/监听访问权限,并且随着处理正常进行,该错误似乎无关紧要。然而,该消息在我的仪表板中造成了信噪比问题(每天收到 10-70 个错误)。关于为什么会发生这种情况有什么想法吗?监听器正在 Azure 应用服务中运行,但我认为这并不重要。我已经调整了重试逻辑,以使用 RetryExponential 进行 1 秒到 1 分钟的退避,重试 5 次。

Request for guidance from SDK developers

套餐

Net Core 3.1

Microsoft.Azure.ServiceBus,版本=4.1.3.0,文化=中性,PublicKeyToken=7e3416​​7dcc6d6d8c

错误消息

The link 'xxx;xxx:xxx:xxx:source(address:xxx):xxx' is force detached. Code: RenewToken. Details: Unauthorized access. 'Listen' claim(s) are required to perform this operation. Resource: 'sb://xxx.servicebus.windows.net/xxx'.. TrackingId:xxx, SystemTracker:xxx, Timestamp:2020-04-27T09:36:04 The link 'xxx;xxx:xxx:xxx:source(address:xxx):xxx' is force detached. Code: RenewToken. Details: Unauthorized access. 'Listen' claim(s) are required to perform this operation. Resource: 'sb://xxx.servicebus.windows.net/xxx'.. TrackingId:xxx, SystemTracker:xxx, Timestamp:2020-04-27T09:36:04

来源

internal delegate TClient ClientFactory<out TClient>(string connectionString, string entityPath,
    RetryPolicy retryPolicy);

internal delegate Task OnMessageCallback<in TMessage>(TMessage message,
    CancellationToken cancellationToken = default) where TMessage : ICorrelative;

internal sealed class ReceiverClientWrapper<TMessage> : IReceiverClientWrapper<TMessage>
    where TMessage : ICorrelative
{
    // ReSharper disable once StaticMemberInGenericType
    private static readonly Regex TransientConnectionErrorRegex =
        new Regex(
            @"(The link '([a-f0-9-]+);([0-9]*:)*source\(address:([a-z0-9_]+)\):([a-z0-9_]+)' is force detached. Code: RenewToken. Details: Unauthorized access. 'Listen' claim\(s\) are required to perform this operation. Resource: 'sb:\/\/([a-z0-9-_.\/]+)'.. TrackingId:([a-z0-9_]+), SystemTracker:([a-z0-9]+), Timestamp:([0-9]{4}(-[0-9]{2}){2}T([0-9]{2}:){2}[0-9]{2}) )+",
            RegexOptions.Compiled | RegexOptions.Multiline | RegexOptions.IgnoreCase);

    private readonly IReceiverClient _receiverClient;
    private readonly IMessageConverter<TMessage> _messageConverter;
    private readonly ILogger _logger;
    private readonly int _maximumConcurrency;

    public ReceiverClientWrapper(IReceiverClient receiverClient, IMessageConverter<TMessage> messageConverter,
        ILogger logger, int maximumConcurrency)
    {
        _receiverClient = receiverClient;
        _messageConverter = messageConverter;
        _logger = logger;
        _maximumConcurrency = maximumConcurrency;
    }

    public Task SubscribeAsync(OnMessageCallback<TMessage> onMessageCallback,
        OnFailureCallback onFailureCallback, CancellationToken cancellationToken = default)
    {
        var messageHandlerOptions = CreateMessageHandlerOptions(onFailureCallback, cancellationToken);

        async Task Handler(Message message, CancellationToken token)
        {
            var convertedMessage = _messageConverter.Convert(message);

            await onMessageCallback(convertedMessage, cancellationToken);
            await _receiverClient.CompleteAsync(message.SystemProperties.LockToken);
        }

        _receiverClient.RegisterMessageHandler(Handler, messageHandlerOptions);

        return Task.CompletedTask;
    }

    private MessageHandlerOptions CreateMessageHandlerOptions(OnFailureCallback onFailureCallback,
        CancellationToken cancellationToken)
    {
        async Task HandleExceptionAsync(ExceptionReceivedEventArgs arguments)
        {
            var exception = arguments.Exception;

            if (TransientConnectionErrorRegex.IsMatch(exception.Message))
            {
                _logger.LogWarning(exception, @"Transient connectivity error occurred");

                return;
            }

            await onFailureCallback(exception, cancellationToken);
        }

        return new MessageHandlerOptions(HandleExceptionAsync)
        {
            AutoComplete = false,
            MaxConcurrentCalls = _maximumConcurrency
        };
    }

    public async ValueTask DisposeAsync()
    {
        await _receiverClient.CloseAsync();
    }
}

internal sealed class SenderClientWrapper<TMessage> : ISenderClientWrapper<TMessage> where TMessage : ICorrelative
{
    private readonly ISenderClient _senderClient;
    private readonly IMessageConverter<TMessage> _messageConverter;

    public SenderClientWrapper(ISenderClient senderClient, IMessageConverter<TMessage> messageConverter)
    {
        _senderClient = senderClient;
        _messageConverter = messageConverter;
    }

    public Task SendAsync(TMessage message, CancellationToken cancellationToken = default)
    {
        var internalMessage = _messageConverter.Convert(message);

        return _senderClient.SendAsync(internalMessage);
    }

    public Task SendAsync(IEnumerable<TMessage> messages, CancellationToken cancellationToken = default)
    {
        var internalMessages = messages
            .Select(_messageConverter.Convert)
            .ToImmutableArray();

        return _senderClient.SendAsync(internalMessages);
    }

    public async ValueTask DisposeAsync()
    {
        await _senderClient.CloseAsync();
    }
}

internal abstract class AbstractClientWrapperFactory
{
    private const int MaximumRetryCount = 5;
    private static readonly TimeSpan MinimumRetryBackOff = TimeSpan.FromSeconds(1);
    private static readonly TimeSpan MaximumRetryBackOff = TimeSpan.FromMinutes(1);

    protected AbstractClientWrapperFactory(IOptions<MessageBusConfiguration> options)
    {
        Options = options;
    }

    protected IOptions<MessageBusConfiguration> Options { get; }

    protected static string GetEntityPath<TMessage>() where TMessage : class
    {
        var messageAttribute = typeof(TMessage).GetCustomAttribute<AbstractMessageAttribute>();

        if (messageAttribute == null)
        {
            throw new ArgumentException($@"Message requires {nameof(AbstractMessageAttribute)}");
        }

        return messageAttribute.EntityName;
    }

    protected TClient CreateClientEntity<TMessage, TClient>(ClientFactory<TClient> clientFactory)
        where TMessage : class
    {
        var entityPath = GetEntityPath<TMessage>();
        var retryPolicy = CreateRetryPolicy();

        return clientFactory(Options.Value.ConnectionString, entityPath, retryPolicy);
    }

    protected static IQueueClient QueueClientFactory(string connectionString, string entityPath,
        RetryPolicy retryPolicy)
    {
        return new QueueClient(connectionString, entityPath, retryPolicy: retryPolicy);
    }

    private static RetryPolicy CreateRetryPolicy()
    {
        return new RetryExponential(MinimumRetryBackOff, MaximumRetryBackOff, MaximumRetryCount);
    }
}

internal sealed class SenderClientWrapperFactory : AbstractClientWrapperFactory, ISenderClientWrapperFactory
{
    private readonly IMessageConverterFactory _messageConverterFactory;

    public SenderClientWrapperFactory(IMessageConverterFactory messageConverterFactory,
        IOptions<MessageBusConfiguration> options) : base(options)
    {
        _messageConverterFactory = messageConverterFactory;
    }

    public ISenderClientWrapper<TEvent> CreateTopicClient<TEvent>() where TEvent : class, IEvent
    {
        return CreateWrapper<TEvent, ITopicClient>(TopicClientFactory);
    }

    public ISenderClientWrapper<TRequest> CreateQueueClient<TRequest>() where TRequest : class, IRequest
    {
        return CreateWrapper<TRequest, IQueueClient>(QueueClientFactory);
    }

    private ISenderClientWrapper<TMessage> CreateWrapper<TMessage, TClient>(ClientFactory<TClient> clientFactory)
        where TMessage : class, ICorrelative
        where TClient : ISenderClient
    {
        var clientEntity = CreateClientEntity<TMessage, TClient>(clientFactory);
        var messageConverter = _messageConverterFactory.Create<TMessage>();

        return new SenderClientWrapper<TMessage>(clientEntity, messageConverter);
    }

    private static ITopicClient TopicClientFactory(string connectionString, string entityPath,
        RetryPolicy retryPolicy)
    {
        return new TopicClient(connectionString, entityPath, retryPolicy);
    }
}

internal sealed class ReceiverClientWrapperFactory : AbstractClientWrapperFactory, IReceiverClientWrapperFactory
{
    private readonly IMessageConverterFactory _messageConverterFactory;
    private readonly ILogger<ReceiverClientWrapperFactory> _logger;

    public ReceiverClientWrapperFactory(IOptions<MessageBusConfiguration> options,
        IMessageConverterFactory messageConverterFactory,
        ILogger<ReceiverClientWrapperFactory> logger) : base(options)
    {
        _messageConverterFactory = messageConverterFactory;
        _logger = logger;
    }

    public IReceiverClientWrapper<TEvent> CreateTopicClient<TEvent>() where TEvent : class, IEvent
    {
        return CreateReceiverClientWrapper<TEvent, ISubscriptionClient>(SubscriptionClientFactory);
    }

    public IReceiverClientWrapper<TRequest> CreateQueueClient<TRequest>() where TRequest : class, IRequest
    {
        return CreateReceiverClientWrapper<TRequest, IQueueClient>(QueueClientFactory);
    }

    private IReceiverClientWrapper<TMessage> CreateReceiverClientWrapper<TMessage, TClient>(
        ClientFactory<TClient> clientFactory)
        where TMessage : class, ICorrelative
        where TClient : IReceiverClient
    {
        var clientEntity = CreateClientEntity<TMessage, TClient>(clientFactory);
        var messageConverter = _messageConverterFactory.Create<TMessage>();

        return new ReceiverClientWrapper<TMessage>(clientEntity, messageConverter, _logger,
            Options.Value.MaximumConcurrency);
    }

    private ISubscriptionClient SubscriptionClientFactory(string connectionString, string entityPath,
        RetryPolicy retryPolicy)
    {
        return new SubscriptionClient(connectionString, entityPath, Options.Value.SubscriberName,
            retryPolicy: retryPolicy);
    }
}

internal sealed class RequestService<TRequest> : IRequestService<TRequest> where TRequest : class, IRequest
{
    private readonly Lazy<ISenderClientWrapper<TRequest>> _senderClient;
    private readonly Lazy<IReceiverClientWrapper<TRequest>> _receiverClient;

    public RequestService(ISenderClientWrapperFactory senderClientWrapperFactory,
        IReceiverClientWrapperFactory receiverClientWrapperFactory)
    {
        _senderClient =
            new Lazy<ISenderClientWrapper<TRequest>>(senderClientWrapperFactory.CreateQueueClient<TRequest>,
                LazyThreadSafetyMode.PublicationOnly);

        _receiverClient
            = new Lazy<IReceiverClientWrapper<TRequest>>(receiverClientWrapperFactory.CreateQueueClient<TRequest>,
                LazyThreadSafetyMode.PublicationOnly);
    }

    public Task PublishRequestAsync(TRequest requestMessage, CancellationToken cancellationToken = default)
    {
        return _senderClient.Value.SendAsync(requestMessage, cancellationToken);
    }

    public Task PublishRequestAsync(IEnumerable<TRequest> requestMessages,
        CancellationToken cancellationToken = default)
    {
        return _senderClient.Value.SendAsync(requestMessages, cancellationToken);
    }

    public Task SubscribeAsync(OnRequestCallback<TRequest> onRequestCallback, OnFailureCallback onFailureCallback,
        CancellationToken cancellationToken = default)
    {
        return _receiverClient
            .Value
            .SubscribeAsync((message, token) => onRequestCallback(message, cancellationToken), onFailureCallback,
                cancellationToken);
    }

    public async ValueTask DisposeAsync()
    {
        if (_senderClient.IsValueCreated)
        {
            await _senderClient.Value.DisposeAsync();
        }

        if (_receiverClient.IsValueCreated)
        {
            await _receiverClient.Value.DisposeAsync();
        }
    }

    public Task ThrowIfNotReadyAsync(CancellationToken cancellationToken = default)
    {
        return _senderClient.Value.SendAsync(ImmutableArray<TRequest>.Empty, cancellationToken);
    }
}

最佳答案

使用 original Nuget package 从未解决此问题,但是新的Azure.Messaging.ServiceBus package似乎没有这个问题。我已经选择转向这一点。

关于c# - 使用 Azure 服务总线队列监听器定期收到 RenewToken 上的未经授权的访问错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61459576/

相关文章:

c# - 将值解析为可为空的枚举

c# - 点之间的数字子串

azure - 表达式应返回与 Azure 数据工厂中先前表达式 (iif) 相同的类型 'integer'

c# - 避免在带有 Serilog 的 AWS Lambda 上使用 netcore2.0 记录两次

c# - 如何检测程序(进程)是否正在运行并在 Windows 8.1 通用应用程序中获取其实例

c# - Jwt 代码不适用于 .NET Core 2

asp.net-core - 移动特定 View /设备检测

c# - 如何在asp.net core 2.2中注册IFileProvider进行依赖注入(inject)?

azure - 如何修复 Azure DevOps 发布管道中的错误 "Error Code: ERROR_DESTINATION_INVALID"

visual-studio - VS2013测试代理和 Controller 不通信