java - ServiceBusSessionReceiverAsyncClient 在关闭期间抛出 IllegalStateException

标签 java azure azureservicebus azure-servicebus-queues azure-java-sdk

使用 ServicebusSessionReceiverAsyncClient 从服务总线队列接收单个消息时,会引发 IllegalStateException。该消息提到尝试向已关闭的连接添加积分。

我使用 take(1) 和 next() 将 Flux 转换为单个结果 Mono。文档说,在流上使用 take(1) 将在第一个结果后关闭流,这就是我想要做的。

我的接收器代码:

private <T extends IWocTransaction> Mono<Optional<T>> responseAsync(String transactionId, Class<T> clazz) {

        var asyncClient = sbClientBuilder.connectionString(sbConnectionString)
                .sessionReceiver()
                .queueName("my-callback-queue")
                .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                .buildAsyncClient();

        var msgStream = Flux.usingWhen(asyncClient.acceptSession(transactionId),
                receiver -> receiver.receiveMessages(),
                receiver -> Mono.fromRunnable(receiver::close)
        );

        return Mono.from(msgStream

                        .timeout(timeout)
                        .take(1)
                        .next()

                ).map(message -> {

                    var json = message.getBody().toString();

                    try {
                        var val = objectMapper.readValue(json, clazz);
                        return val != null ? Optional.of(val) : Optional.<T>empty();
                    } catch (Exception e) {
                        log.error("Error deserializing response from string {}", json, e);
                        return Optional.<T>empty();
                    }
                })
                .doOnError(t -> {
                    if (t instanceof TimeoutException) {
                        log.error("Timeout error waiting on API callback {}", kv("ApiTimeout", timeout.toString()), t);
                    } else {
                        log.error("Error waiting for async callback", t);
                    }
                }).onErrorReturn(Optional.empty());
    }

此代码工作正常,但我每次运行时都会遇到此异常:

13:46:27.122 [io-executor-thread-1] INFO  c.a.m.s.ServiceBusClientBuilder - {"az.sdk.message":"Closing a dependent client.","numberOfOpenClients":1}
13:46:27.127 [receiver-0-1] INFO  c.a.m.s.ServiceBusSessionReceiver - {"az.sdk.message":"There is no lock token.","sessionId":"adfadsr","messageId":"fb70e81e4d304b8fb34092440243554a"}
13:46:27.138 [receiver-0-1] INFO  c.a.m.s.ServiceBusReceiverAsyncClient - Removing receiver links.
13:46:27.167 [receiver-0-1] ERROR c.a.c.a.i.ReactorReceiver - {"az.sdk.message":"Cannot add credits to closed link: adfadsr","exception":"Cannot add credits to closed link: adfadsr","connectionId":"MF_57a511_1680201985206","entityPath":"woc-callback-queue","linkName":"adfadsr"}
13:46:27.175 [receiver-0-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
**reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr**
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr
    at com.azure.core.amqp.implementation.ReactorReceiver.addCredits(ReactorReceiver.java:227)
    at com.azure.messaging.servicebus.ServiceBusSessionReceiver.lambda$new$2(ServiceBusSessionReceiver.java:92)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:447)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
    at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:237)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr

    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
    at io.micronaut.reactive.reactor.instrument.ReactorInstrumentation.lambda$init$0(ReactorInstrumentation.java:62)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
13:46:27.194 [reactor-executor-1] INFO  c.a.c.a.i.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_57a511_1680201985206","errorCondition":null,"errorDescription":null,"linkName":"adfadsr","entityPath":"woc-callback-queue"}
13:46:27.198 [reactor-executor-1] INFO  c.a.c.a.i.ReactorSession - {"az.sdk.message":"Complete. Removing receive link.","connectionId":"MF_57a511_1680201985206","linkName":"adfadsr","entityPath":"woc-callback-queue"}
13:46:27.199 [reactor-executor-1] INFO  c.a.c.a.i.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkFinal","connectionId":"MF_57a511_1680201985206","linkName":"adfadsr","entityPath":"woc-callback-queue"}

如何防止抛出 IllegalStateException 或至少处理它?<​​/p>

最佳答案

IllegalStateException:无法向关闭的链接添加积分“不应将其抛出”到应用程序,而“应仅记录”。

有时会发生这种情况,因为有几个线程同时运行。一是非阻塞IO_thread(处理消息帧,通过流帧发送信用),二是向应用程序传递消息的Worker_thread。第三个线程是应用程序的 handler_thread,在该线程上调用应用的responseAsync。

发生的情况是,当responseAsync从[any]_thread关闭客户端时,后台的IO_thread在接收关闭请求时仍然可能正在做一些工作。当 IO_thread 正在发送流程帧而客户端的其他部分正在关闭时,该错误会出现在日志中。该日志条目将被忽略。

看起来应用程序的设计是为每个请求创建和处理客户端。这意味着应用程序会根据每个请求创建和关闭 TCP(到服务总线)连接,这可能会很繁重。

关于java - ServiceBusSessionReceiverAsyncClient 在关闭期间抛出 IllegalStateException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75892274/

相关文章:

java - 正则表达式与模式匹配

java lang 验证错误

c# - Mono 上的 Azure TimerManager 的类型初始值设定项引发异常

c# - Azure DevOps - 通过 Api 更新用户配置文件

c# - Azure 下载 blob,创建 zip 并发送给用户

c# - 如何禁用 'EnableLinkRedirect' 属性以便在没有端口 104XX 的情况下从事件中心获取数据?

c# - Azure 服务总线连续 WebJob 暂停

java - 在java中使用AVL树

java - 无法通过 AsyncTask 和 inputStreamReader 获取 HTML 代码

azure - 发布到 azure 选项在 Visual Studio 2017 中不可见