mono - 合并两个 Mono 并得到一个 Flux。然后从该 Flux 中提取 Mono

标签 mono spring-webflux project-reactor flux reactor

我有两个 Mono<T>我从两个不同的来源得到的让我们说KAFKA .

我的目的是合并两者 Mono进入Flux<T>1

然后使用 public final Mono<T> reduce(BiFunction<T,T,T> aggregator) Flux中的方法创建最终的 Mono (因为上述两个 Mono 的响应时间可能会有所不同)。 2

方法:

方法有很多如contact , zip , zipWith用于Flux 。我如何找到正确的使用方法(两个 MonoFlux 转换,即 1)。

这是REDUCE方法确实正确,还是可以采取其他措施来即兴发挥(2)?谢谢。

最佳答案

如果您确实想使用 Flux 来执行此操作,那么您可能需要使用 merge(),类似于:

Flux.merge(mono1(), mono2()).reduce((obj1, obj2) -> foo(obj1, obj2));

...其中 foo() 实现了问题中 reduce 方法的功能,将发出的两个对象组合成一个值。您不会想使用 concat() ,除非您想一次订阅每个 Mono 一个,等待每个完成,而不是一起订阅 - 并且Flux.zipXXX 系列运算符将用于将单独的通量压缩在一起,因此您不会希望这样。

但是,我认为您在这里对于两个值没有完全正确的方法 - 如果您想将两个 Mono 发布者放入 Flux 中,然后立即减少它们返回到 Mono,那么使用 Flux 根本没有多大意义,因为您必须等待两个发布者完成才能发出任何内容,然后您只需发出一个值。

相反,我建议使用 this variant of Mono.zip() ,它允许您一次性完成所需的一切,例如:

Mono.zip(mono1(), mono2(), (obj1, obj2) -> foo(obj1, obj2));

关于mono - 合并两个 Mono 并得到一个 Flux。然后从该 Flux 中提取 Mono,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67461029/

相关文章:

c# - Begin-EndReceive 看不到到达的数据

spring - 如果 webflux mono 的条件在 kotlin 中为真,则仅在 mongo db 中存储用户

java - 如何在项目reactor中设置阻塞异步请求/响应?

c# - 我可以使用 LINQ IEnumerable 结果作为 Gtk.TreeView 的数据源吗?

c# - 连接到具有特定实例名称的 sql server 时 Mono 崩溃

C# Mono/.NET差分CAD软件检索

java - 无阻塞地将 Mono 转换为 Pojo

java - 无法在 Spring Webflux 2.1.0.RELEASE 中启动 Netty 服务器

spring-boot - 使用 Spring boot + WebFlux 处理全局错误

java - 何时使用 Mono.never()?