java - Spring 启动+ webflux : context lost when running some steps in parallel

标签 java spring-boot spring-webflux project-reactor

Spring 启动:2.1.3.RELEASE

你好,

我正在尝试使用 spring webflux 的上下文功能来携带一个简单的变量。我有一个 WebFilter 设置了这样一个变量的上下文,我尝试在我的流量/流的不同阶段在我的 Controller 中使用它。在某些时候,我在调用 Flux 类的方法“parallel()”后丢失了它。

  • 过滤器:
public class TestFilter implements WebFilter {

    private Logger LOG = LoggerFactory.getLogger(TestFilter.class);

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return chain.filter(exchange)
            .doOnEach(voidSignal -> System.out.println("filter:"+voidSignal.getContext().getOrEmpty("blob"))).subscriberContext(Context.of("blob", "kapoue"));
    }

}
  • 控制者:
@RestController
@RequestMapping(TestControllerWebFlux.ROOT)
public class TestControllerWebFlux {

    static final String ROOT = "/flux";
    static final String TEST = "/test";

    private WebClient webClient = WebClient.create();

    @GetMapping(
            value = TEST,
            produces = {MediaType.APPLICATION_JSON_VALUE})
    public Mono<String> test() {
        System.out.println("controller1:"+Thread.currentThread());

        Flux<String> call = webClient.get().uri("http://localhost:" + 8080 + ROOT + "/test2").retrieve().bodyToFlux(Result.class).map(Result::getValue);

        return call.map(s -> s+"0")
            .doOnEach(stringSignal -> System.out.println("controller2:"+stringSignal.getContext().getOrEmpty("blob")))
            .parallel()
            .doOnEach(stringSignal -> System.out.println("controller3:"+stringSignal.getContext().getOrEmpty("blob")))
            .map(s -> s+"0")
            .doOnEach(stringSignal -> System.out.println("controller4:"+stringSignal.getContext().getOrEmpty("blob")))
            .reduce((s, s2) -> s+s2)
            .doOnEach(stringSignal -> System.out.println("controller5:"+stringSignal.getContext().getOrEmpty("blob")))
            .map(s -> {
                System.out.println("controller6:"+Thread.currentThread());
                return s;
            });
    }

    @GetMapping(
        value = "test2",
        produces = {MediaType.APPLICATION_JSON_VALUE})
    public Flux<Result> test2() {
        return Flux.just(new Result("0"), new Result("0"), new Result("0"));
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Result {
        private String value;
    }
}

我所做的就是调用 http://localhost:8080/flux/test/端点,我明白了:

controller1:Thread[reactor-http-nio-2,5,main] controller2:Optional[kapoue] controller3:Optional.empty controller4:Optional.empty controller2:Optional[kapoue] controller3:Optional.empty controller4:Optional.empty controller2:Optional[kapoue] controller3:Optional.empty controller4:Optional.empty controller2:Optional[kapoue] controller3:Optional.empty controller4:Optional.empty controller3:Optional.empty controller4:Optional.empty controller3:Optional.empty controller4:Optional.empty controller3:Optional.empty controller4:Optional.empty controller5:Optional[kapoue] controller6:Thread[reactor-http-nio-2,5,main] filter:Optional[kapoue]

如您所见,上下文在“并行”方法后立即丢失,并在减少后以某种方式恢复。

这是一个错误还是我不应该在这样的调用之后尝试并行运行?

提前感谢您的帮助。

最佳答案

这看起来像是 Reactor 中的错误。我举报了:https://github.com/reactor/reactor-core/issues/1656

关于java - Spring 启动+ webflux : context lost when running some steps in parallel,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55606752/

相关文章:

spring - RestController 与 Spring WebFlux :Required parameter is not present

spring - HATEOAS 关于 Spring Flux/Mono 响应

spring-boot - Spring WebFlux 认证的 WebSocket 连接

java - 更新struts2中迭代器标签的值

java spring boot 在启动之前从数据库获取值

spring-boot - Spring Boot 和 Kafka : Failed to send SSL Close message

spring-boot - 使用 JpaRepository 将 protobuf 直接保存为实体

java - 如何拆分两个连续的括号?

java - 如何在spring mvc中的每个请求之前附加一个单词?

java - 在模型上重复纹理