spring-webflux - 使用 spring 响应式(Reactive) webClient 面临问题 "WebClientRequestException: Pending acquire queue has reached its maximum size of 1000"

标签 spring-webflux spring-webclient reactor-netty

我正在运行微服务 API 的负载,这涉及使用 Spring Reactive Webclient 调用其他微服务 API。我正在使用 Postman runner 选项卡来测试这个。
首先,我以 1500 次迭代运行负载,为每个请求调用第二个微服务,一切都按预期正常工作。
但是当我以 5000 次迭代运行负载时,第二个微服务被调用了 3500 次,1500 次调用因问题而失败

WebClientRequestException: Pending acquire queue has reached its maximum size of 1000


使用 org.springframework.web.reactive.function.client.WebClient 和默认配置,下面是代码片段。
 private WebClient webClient;

    @PostConstruct
    public void init() {
        this.webClient = WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE,  MediaType.APPLICATION_JSON_VALUE)
                .build();
    }
可以做些什么来避免这种情况?
我正在使用最新的 spring-boot-starter-parent 依赖项(版本 2.5.3)和 spring-webflux-5.3.9.jar jar。
日志:
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
        at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)
        at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67)
        at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:375)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:296)
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:885)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
        at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
        at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

**Caused by: org.springframework.web.reactive.function.client.WebClientRequestException: Pending acquire queue has reached its maximum size of 1000; nested exception is reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000**
        at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
        |_ checkpoint ⇢ Request to POST http://172.20.0.2:3130/v1/login/mobile [DefaultWebClient]
Stack trace:
                at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
                at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)
                at reactor.core.publisher.Mono.subscribe(Mono.java:4338)
                at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
                at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
                at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:414)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
                at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
                at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
                at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$ClientTransportSubscriber.onError(HttpClientConnect.java:304)
                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
                at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onError(DefaultPooledConnectionProvider.java:172)
                at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:444)
                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:543)
                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:266)
                at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:399)
                at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212)
                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:674)
                at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:137)
                at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268)
                at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
                at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)
                at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)
                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:271)
                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
                at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.resubscribe(FluxRetryWhen.java:216)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onNext(FluxRetryWhen.java:269)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:282)
                at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:861)
                at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
                at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
                at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
                at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
                at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
                at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
                at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
                at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
                at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
                at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                at java.base/java.lang.Thread.run(Thread.java:829)
**Caused by: reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000
        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:543)**
        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:266)
        at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:399)
        at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212)
        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:674)
        at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:137)
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268)
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)
        at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)

最佳答案

WebClient 需要一个 HTTP 客户端库来执行请求,默认情况下它使用 Reactor Netty。
引自 Reactor-netty reference docs

By default, Reactor Netty client uses a “fixed” connection pool with 500 as the maximum number of active channels and 1000 as the maximum number of further channel acquisition attempts allowed to be kept in a pending state (for the rest of the configurations check the system properties or the builder configurations below). This means that the implementation creates a new channel if someone tries to acquire a channel as long as less than 500 have been created and are managed by the pool. When the maximum number of channels in the pool is reached, up to 1000 new attempts to acquire a channel are delayed (pending) until a channel is returned to the pool again, and further attempts are declined with an error.


您所看到的是您正在积极使用连接池中的所有 500 个连接,并且您已经用 1000 个待处理请求填满了“待处理”队列。
您有 2 个选项可以解决此问题
垂直缩放
增加连接池大小和/或获取队列长度
ConnectionProvider connectionProvider = ConnectionProvider.builder("myConnectionPool")
        .maxConnections(<your_desired_max_connections>)
        .pendingAcquireMaxCount(<your_desired_pending_queue_size>)
        .build();
ReactorClientHttpConnector clientHttpConnector = new ReactorClientHttpConnector(HttpClient.create(connectionProvider));
WebClient.builder()
        .clientConnector(clientHttpConnector)
        .build();
水平缩放
创建应用程序的其他实例并在实例之间对 api 调用进行负载平衡。
Spring reference docs
补充说明:
在计算连接池的大小时,值得考虑下游 api 调用的延迟。一个好的起点是

connection_pool_size = tps * downstream_api_latency


tps(每秒事务数)

关于spring-webflux - 使用 spring 响应式(Reactive) webClient 面临问题 "WebClientRequestException: Pending acquire queue has reached its maximum size of 1000",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68640474/

相关文章:

java - 在 Pageable 中传递到 Spring Controller 的排序将方向附加到属性名称,导致 SQL 错误

Spring webflux : purely functional way to attach websocket adapter to reactor-netty server

websocket - 使用 Micronaut Websocket 端点从未绑定(bind)的 Flux 发送更新

java - 将 Webclient 与 Mono 和 Flux 结合使用时出现的问题

java - SpringBoot Webflux 无法返回 application/xml

spring - Spring WebFlux中如何通过handler方法拦截请求

java - 从 WebClient 访问时,为什么 REST 端点的处理程序会被访问两次?

java - 带有 ParameterizedTypeReference 的 Spring WebClient 不起作用

spring - 更改 spring boot 2.0 上下文路径

java - 在转换方法 WebFlux 中从 lambda 返回 null 或可为 null 的内容