顺便说一句,我还在学习 weblux; 我不知道这是否可能,或者我的方法是错误的,但考虑到这种并行通量。
Flux<String> enablers = Flux.fromIterable(enablersList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element -> service.getAMono(string, entity, element))
.sequential();
谁调用了具有 Web 客户端请求的方法 (service.getAMino)
webClient.post()
.uri(url)
.headers(headers -> headers.addAll(httpHeaders))
.body(BodyInserters.fromObject(request))
.retrieve()
.bodyToMono(entity2.class);
我需要等待启用器通量的流结束并处理其中的所有响应,原因是如果其中一个给我错误或否定响应,我将不会运行另一个并行通量 for blockers
Flux<String> blockers = Flux.fromIterable(blockersList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element -> service.callAMono(string, entity, element))
.sequential();
我想到了“zip”方法,但是这个合并了两个响应,这不是我想要的 如果有人能帮我解决这个问题。
更新
enablers. //handle enablers response and if error return a custom Mono<response> with .reduce
如果 enablers
的句柄中没有错误,则继续使用其他 Flux
进行 .thenMany
最佳答案
我在第一个flux
中找到了通过条件any
来做到这一点的方法,就像这样
Flux.fromIterable(enablersList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element -> service.getAMono(string, entity, element))
.sequential()
.any(element -> *stuff here)//condition
.flatMap(condition->{
if(condition.equals(Boolean.FALSE)){
return Flux.fromIterable(blockersList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(element -> service.callAMono(string, entity, element))
.sequential()
.reduce(**stuff here)// handle noError response and return;
}
return Mono.just(**stuff here);//handle error response and return
});
如果有其他方法可以做到这一点,我很高兴您将其发布在这里,谢谢,:D
关于java - 在另一个通量结束后执行并行通量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60119547/