我有两个 Mono<T>
我从两个不同的来源得到的让我们说KAFKA
.
我的目的是合并两者 Mono
进入Flux<T>
。 1
然后使用 public final Mono<T> reduce(BiFunction<T,T,T> aggregator)
Flux
中的方法创建最终的 Mono
(因为上述两个 Mono
的响应时间可能会有所不同)。 2
方法:
方法有很多如contact
, zip
, zipWith
用于Flux
。我如何找到正确的使用方法(两个 Mono
到 Flux
转换,即 1)。
这是REDUCE
方法确实正确,还是可以采取其他措施来即兴发挥(2)?谢谢。
最佳答案
如果您确实想使用 Flux
来执行此操作,那么您可能需要使用 merge()
,类似于:
Flux.merge(mono1(), mono2()).reduce((obj1, obj2) -> foo(obj1, obj2));
...其中 foo()
实现了问题中 reduce
方法的功能,将发出的两个对象组合成一个值。您不会想使用 concat()
,除非您想一次订阅每个 Mono
一个,等待每个完成,而不是一起订阅 - 并且Flux.zipXXX
系列运算符将用于将单独的通量压缩在一起,因此您不会希望这样。
但是,我认为您在这里对于两个值没有完全正确的方法 - 如果您想将两个 Mono
发布者放入 Flux
中,然后立即减少它们返回到 Mono
,那么使用 Flux
根本没有多大意义,因为您必须等待两个发布者完成才能发出任何内容,然后您只需发出一个值。
相反,我建议使用 this variant of Mono.zip()
,它允许您一次性完成所需的一切,例如:
Mono.zip(mono1(), mono2(), (obj1, obj2) -> foo(obj1, obj2));
关于mono - 合并两个 Mono 并得到一个 Flux。然后从该 Flux 中提取 Mono,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67461029/