java - Spring Gateway AsyncPredicate 不适用于 react 堆和通量

标签 java spring-boot maven spring-webflux spring-reactor

我们为 Spring-Gateway 编写了一个自定义 Predicate 工厂来路由请求。我们正在解析 XML 请求的主体,然后根据主体中存在的特定方法导出路由。在执行此操作时,我们编写了以下代码来创建 ServerRquest。

@Override
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
        return exchange -> {
            Class<String> inClass = String.class;

            Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);

            if (cachedBody != null) {
                try {
                    boolean test = config.pattern.matcher((String) cachedBody).matches();
                    exchange.getAttributes().put(TEST_ATTRIBUTE, test);
                    return Mono.just(test);
                } catch (ClassCastException e) {
                    LOG.error("Predicate test failed because String.class does not match the cached body object", e);
                }
                return Mono.just(false);
            } else {

                return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

                    DataBufferUtils.retain(dataBuffer);

                    Flux<DataBuffer> cachedFlux = Flux
                            .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

                    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

                        @Override
                        public Flux<DataBuffer> getBody() {
                            return cachedFlux;
                        }
                    };
                    return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
                            .bodyToMono(inClass).doOnNext(objectValue -> {
                                exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                                exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachedFlux);
                            }).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
                });

            }
        };
    }

使用旧版本的 Spring-Boot-Parent (2.1.7.RELEASE) 和 spring-cloud-dependencies (Greenwich.RELEASE) 可以完美地运行此解决方案。但是使用最新版本的 Spring-Boot-Parent (2.3.1.RELEASE) 和 spring-cloud-dependencies (Hoxton.SR6) 我遇到了以下异常。网关应用正常启动,没有报错。

Caused by: java.lang.ClassCastException: class reactor.core.publisher.FluxDefer cannot be cast to class org.springframework.core.io.buffer.PooledDataBuffer (reactor.core.publisher.FluxDefer and org.springframework.core.io.buffer.PooledDataBuffer are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @306a30c7)
        at org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter.lambda$filter$0(RemoveCachedBodyFilter.java:37) ~[spring-cloud-gateway-core-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156) ~[reactor-core-3.3.6.RELEASE.jar!/:3.3.6.RELEASE]
        ... 84 more

有没有其他人也遇到同样的问题并且知道如何解决这个问题?

最佳答案

问题是,the greenwich version of those apis was beta .现在 CACHED_REQUEST_BODY_ATTR 中预期的对象必须是 PooledDataBuffer。所以我现在相应地更改了我的代码。现在看起来如下所示:

return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

                    DataBufferUtils.retain(dataBuffer);

                    Flux<DataBuffer> cachedFlux = Flux
                            .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

                    PooledDataBuffer cachePool = (PooledDataBuffer) dataBuffer.slice(0, dataBuffer.readableByteCount());
                    
                    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

                        @Override
                        public Flux<DataBuffer> getBody() {
                            return cachedFlux;
                        }
                    };
                    return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
                            .bodyToMono(inClass).doOnNext(objectValue -> {
                                exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                                exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachePool);
                            }).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
                });

更新类后,它现在按预期工作。

关于java - Spring Gateway AsyncPredicate 不适用于 react 堆和通量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63114188/

相关文章:

spring-boot - 如何防止 Redis 写入匿名用户 session

java - 如何更改H2数据库的密码加密?

java - 如何强制Eclipse使用maven进行自动构建

使用配置文件属性过滤 Maven 资源

maven - 覆盖数据 block 依赖

java - 如何在 Talend Open Studio 上设置类路径

java - 将字符串与类似字符串的模板进行比较

java - 使用 servlet 将动态图像传递到 JSP

java - Spring Cloud Contract 中生成的测试失败

java - 是否可以在 UI Automator Android 中进行 API 调用