我是响应式编程的新手。我看到可以压缩两个单声道以生成结果:
Mono<Info> info = Mono.just(id).map(this::getInfo).subscribeOn(Schedulers.parallel());
Mono<List<Detail>> detail= Mono.just(petitionRequest).map(this.service::getDetails)
.subscribeOn(Schedulers.parallel());
Flux<Generated> flux = Flux.zip(detail, info, (p, c) -> {
Generated o = Generated.builder().info(c).detail(p).build();
return o;
});
据我所知,当我调用 flux.blockFirst()
如何将另一个单声道合并到现有的两个单声道以生成结果? Flux.zip 只接受两个单声道。
提前致谢。
最佳答案
首先,由于您正在压缩 Monos,因此使用 zip 是有意义的来自 Mono 的运算符而不是 Flux。
它有多个重载版本,可以接受任意数量的 Monos。
此外,如果 this.service::getDetails
和 this::getInfo
正在阻塞 IO 操作(HTTP 请求、数据库调用等),那么您应该使用 elastic调度器而不是并行调度器,后者用于 CPU 密集型操作。
示例代码:
Mono<Info> info = Mono.just(id)
.map(this::getInfo)
.subscribeOn(Schedulers.elastic());
Mono<List<Detail>> detail= Mono.just(petitionRequest)
.map(this.service::getDetails)
.subscribeOn(Schedulers.elastic());
Mono<Description> description = Mono.just(id)
.map(this::callService)
.subscribe(Schedulers.elastic());
Mono.zip(info, detail, description)
.map(this::map);
private Generated map(Tuple3<Info, List<Detail>, Description> tuple3)
{
Info info = tuple3.getT1();
List<Detail> details = tuple3.getT2();
Description description = tuple3.getT3();
// build output here
}
关于java - 从 3 个不同的单声道创建实体,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57141596/