project-reactor - 是否可以并行启动 Mono 并汇总结果

标签 project-reactor spring-webflux

我知道可以链接 Mono 的,例如,...

Mono<String> resultAMono = loadA();
Mono<String> resultBMono = resultA.flatMap(resultA -> loadB());

这将链接并且 resultBMno 将在 resultAMno 返回时运行....

所以我的问题是,是否可以并行启动 2 个 Mono,并且当两个返回都继续使用另一个 Mono 时?

我认为它看起来像这样......
Mono<String> resultAMono = loadA();
Mono<String> resuktBMono = loadB();
Mono<Tuple2<Stirng, String> tupleMono = Mono.zip(resultAMono, resultBMono);

但我不知道这将并行运行,或者我能做些什么来并行运行......

谢谢回答....

最佳答案

2 种语义,1 种使它们并行运行的方法

我在下面介绍的两个选项都需要一些额外的调整来使 A 和 B Mono并行运行:即每个Mono应该使用 subscribeOn(Scheduler)摆脱它们合并的共同点。

如果只关心A和B的完成

使用when收听 A 和 B 完成和 then继续一个完全不同的Mono :

Mono.when(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .then(Mono.just("A and B finished, I don't know their value"));

如果您关心 A 和 B 值

使用zip + map/flatMap取决于你想对结果做什么。
Mono.zip(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .map(tuple2 -> new Foo(tuple2.getT1(), tuple2.getT2(), "bar");

或者
Mono.zip(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .flatMap(tuple2 -> fetchMoreDataAsMono(tuple2.getT1(), tuple2.getT2()));
then将忽略以前的数据,因此使用 zip 没有多大意义在它之前。

还有,zip将导致空 Mono如果 A 或 B 之一为空!
使用switchIfEmpty/defaultIfEmpty以防止这种情况。

关于project-reactor - 是否可以并行启动 Mono 并汇总结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48172582/

相关文章:

spring - 为什么默认配置的spring webflux中没有异常堆栈跟踪?

spring - 我可以使用 Spring 5 WebFlux WebClient 使用非响应式 REST API 服务吗?

java - 如何从另一个 Flux 中排除 Flux 中的所有元素

java - 需要帮助为返回 Flux 流作为输出的函数编写 junit 测试

java - 将上下文从一个通量/单声道传递到另一个

扩展 Flux/实现发布者并多次调用 s.onNext() 时,Spring 5 Reactive 失败

mono - 合并两个 Mono 并得到一个 Flux。然后从该 Flux 中提取 Mono

project-reactor - 如何在spring reactor中将两个发布者合二为一

spring - 如何从 Spring WebClient 获取响应 json

java - 如何在Spring WebFlux中设置ViewResolver