spring - 使用 ReactiveElasticsearchClient 时配置 spring.codec.max-in-memory-size

标签 spring spring-mvc spring-data-elasticsearch

我正在使用 spring-data-elasticsearch 3.2.3 中的 ReactiveElasticsearchClient 和 spring-boot 2.2.0。升级到 spring-boot 2.2.2 时,我得到 org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144.

指示修复该问题以使用 spring.codec.max-in-memory-size 但我仍然遇到相同的异常。

下面是整个异常:

org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
    at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoCollect] :
    reactor.core.publisher.Flux.collect(Flux.java:3273)
    org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
Error has been observed at the following site(s):
    |_     Flux.collect ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
    |_      Mono.filter ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:554)
    |_         Mono.map ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:555)
    |_         Mono.map ⇢ at org.springframework.core.codec.AbstractDataBufferDecoder.decodeToMono(AbstractDataBufferDecoder.java:96)
    |_       checkpoint ⇢ Body from POST http://localhost:9200/_bulk?timeout=1m [DefaultClientResponse]
    |_         Mono.map ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:669)
    |_    Mono.doOnNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:670)
    |_     Mono.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:671)
    |_ Mono.flatMapMany ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.sendRequest(DefaultReactiveElasticsearchClient.java:591)
    |_ Flux.publishNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.bulk(DefaultReactiveElasticsearchClient.java:448)
    |_     Flux.flatMap ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:32)
    |_         Flux.map ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:33)
    |_      Flux.reduce ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:34)
    |_         Mono.zip ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:178)
    |_         Mono.map ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:179)
Stack trace:
        at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
        at org.springframework.core.io.buffer.LimitedDataBufferList.updateCount(LimitedDataBufferList.java:94)
        at org.springframework.core.io.buffer.LimitedDataBufferList.add(LimitedDataBufferList.java:59)
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:119)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:571)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:89)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:313)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:427)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)

谁能告诉我我做错了什么或者这是一个错误吗?

谢谢

最佳答案

使用简单的 react WebClient我遇到了同样的问题(从2.1.9到2.2.1。)我没有运气设置spring.codec.max-in-memory -size ,后来发现一个提示,无论如何这都不是正确的方法:

… On the client side, the limit can be changed in WebClient.Builder.

(source, including dead link :-S )

我仍然没有找到 WebClient.Builder 从哪里获取 default 256K limit 1。但是,以下内容使我能够将缓冲区大小限制提高到 16M:

WebClient.builder()
  .…
  .exchangeStrategies(ExchangeStrategies.builder()
    .codecs(configurer -> configurer
      .defaultCodecs()
      .maxInMemorySize(16 * 1024 * 1024))
    .build())
  .build();

所以,在我看来(不知道 spring-data-elasticsearch 的复杂性)如果你能以某种方式获得 WebClientWebClientProvider 返回您应该能够对其进行更改以包含上面的 ExchangeStrategies

也许您可以按照以下方式提供您自己的 DefaultWebClientProvider 重写(绝对未经测试!):

class MyDefaultWebClientProvider extends DefaultWebClientProvider {
  @Override
  public WebClient get(InetSocketAddress endpoint) {
    return super.get(endpoint)
      .mutate() // Obtain WebClient.Builder instance.
      .exchangeStrategies(ExchangeStrategies.builder()
        .codecs(configurer -> configurer
          .defaultCodecs()
          .maxInMemorySize(16 * 1024 * 1024))
        .build())
      .build();
  }
}

YMMV。

<小时/>

更新 #1:

1) 现在我找到了。 并且它解释了为什么设置spring.codec.max-in-memory-size没有效果;该属性是 hardcoded at 256K在基类中使用所有默认编解码器,参见。 BaseDefaultCodecs

关于spring - 使用 ReactiveElasticsearchClient 时配置 spring.codec.max-in-memory-size,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59326351/

相关文章:

spring - 如果目标上的消费者已关闭,则通知 ActiveMQ 生产者

Spring 启动-管理交易和多个数据源

java - 在方法内部使用静态调用设计 Java 单元测试

ElasticSearch 安装错误 - curl : (7) Failed connect to localhost:9200; Connection refused

java - Spring data elasticsearch GeoPoint 与 spring mvc

java - @Scheduled & 调度程序 : What exactly does pool-size do?

java - 一个 Controller 中的 Crud 操作是好是坏的 MVC 行为?

spring - 我收到 java.lang.NoClassDefFoundError : org/springframework/web/context/WebApplicationContext

java - 为什么 Chrome DevTools 说我的静态资源在使用 Spring 的 mvc :resources? 映射时显式不可缓存

spring - 使用elasticsearch.yml中的默认副本配置