我有一个flux
这是由 Iterable
构建的8 个元素 ( Flux.fromIterable(..)
)。
对于每个通量排放,我想异步调用一个方法。
我尝试了各种方法 dispatchOn
和publishOn
但这不起作用,最终我选择了 map(CompletableFuture.supplyAsync(..), executor)
这会改变 flux
至flux<CompletableFuture<Boolean>>
.
现在我只想在最后一项完成时继续流程。
我尝试过 all(..)
,并与 take(size of the Iterable)
但在这两种情况下,流程在所有元素完成之前都会继续。
我认为这是因为我的执行器只有 4 个线程,并且 CompletableFuture
需要一些时间。 -s 添加到助焊剂中。
为什么不all(..)
或take(8)
等待通量完成?
我怎样才能让它等待?
代码:
Mono
.fromFuture(dbUtil.getEntity(id))
.doOnError(t -> {
...
return;})
.doOnSuccess(s -> log.info("Got it: " + s))
.flatMap( s ->
Flux.fromIterable(s.getItemsMap().entrySet())
.map( e -> CompletableFuture.supplyAsync(()->process(e, s), EXECUTOR))
.take(s.getItemsMap().entrySet().size())
)
.all(...)
.consume(b -> done(b));
最佳答案
如果你想并行计算每个项目,你可以使用 flatMap+just+subscribeOn+map。我对 Reactor 的调度程序类型不太熟悉,所以我给你一个 RxJava 中的例子:
ExecutorService exec = ...
Scheduler scheduler = Schedulers.from(exec);
Observable.from(...)
.flatMap(e -> Observable.just(e).subscribeOn(scheduler).map(v -> process(v)))
.subscribe(...);
关于java - Flux.take(x) 或 Flux.all(..) 不会等待所有排放,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36166339/