java - 在另一个通量结束后执行并行通量

标签 java spring spring-webflux

顺便说一句,我还在学习 weblux; 我不知道这是否可能,或者我的方法是错误的,但考虑到这种并行通量。

Flux<String> enablers = Flux.fromIterable(enablersList)
                .parallel()
                .runOn(Schedulers.elastic())
                .flatMap(element -> service.getAMono(string, entity, element))
                .sequential();

谁调用了具有 Web 客户端请求的方法 (service.getAMino)

webClient.post()
              .uri(url)
              .headers(headers -> headers.addAll(httpHeaders))
              .body(BodyInserters.fromObject(request))
              .retrieve()
              .bodyToMono(entity2.class);

我需要等待启用器通量的流结束并处理其中的所有响应,原因是如果其中一个给我错误或否定响应,我将不会运行另一个并行通量 for blockers

Flux<String> blockers = Flux.fromIterable(blockersList)
                .parallel()
                .runOn(Schedulers.elastic())
                .flatMap(element -> service.callAMono(string, entity, element))
                .sequential();

我想到了“zip”方法,但是这个合并了两个响应,这不是我想要的 如果有人能帮我解决这个问题。

更新

enablers. //handle enablers response and if error return a custom Mono<response> with .reduce

如果 enablers 的句柄中没有错误,则继续使用其他 Flux 进行 .thenMany

最佳答案

我在第一个flux中找到了通过条件any来做到这一点的方法,就像这样

Flux.fromIterable(enablersList)
                .parallel()
                .runOn(Schedulers.elastic())
                .flatMap(element -> service.getAMono(string, entity, element))
                .sequential()
                .any(element -> *stuff here)//condition
                .flatMap(condition->{
                        if(condition.equals(Boolean.FALSE)){
                           return Flux.fromIterable(blockersList)
                                                   .parallel()
                                                   .runOn(Schedulers.elastic())
                                                   .flatMap(element -> service.callAMono(string, entity, element))
                                                   .sequential()
                                                   .reduce(**stuff here)// handle noError response and return;
                          }
                          return Mono.just(**stuff here);//handle error response and return
                 });

如果有其他方法可以做到这一点,我很高兴您将其发布在这里,谢谢,:D

关于java - 在另一个通量结束后执行并行通量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60119547/

相关文章:

java - 我可以在soap处理程序中获取Java类名吗?

javascript - 使用 Spring 3.2.0.RELEASE 的 DeferredResult 反向 ajax。无法在 IE 中工作

java - Spring 服务应该是 null 安全的吗?

netty - 将 spring-webflux 微服务切换到 http/2 (netty)

java - Webflux postgresql

java - oninfowindowclick 仅在有多个标记的情况下与标记信息一起使用

java - 使用命名空间解码 XML 响应

java - ImapIdleChannelAdapter javax.mail.AuthenticationFailedException : [ALERT] Too many simultaneous connections

spring - Spring 事务管理器是否将连接绑定(bind)到线程?

spring-webflux - 如何在 Flux 中迭代一个对象并对其进行操作?