java - Flux.take(x) 或 Flux.all(..) 不会等待所有排放

标签 java rx-java project-reactor

我有一个flux这是由 Iterable 构建的8 个元素 ( Flux.fromIterable(..) )。 对于每个通量排放,我想异步调用一个方法。 我尝试了各种方法 dispatchOnpublishOn但这不起作用,最终我选择了 map(CompletableFuture.supplyAsync(..), executor)这会改变 fluxflux<CompletableFuture<Boolean>> .

现在我只想在最后一项完成时继续流程。 我尝试过 all(..) ,并与 take(size of the Iterable)但在这两种情况下,流程在所有元素完成之前都会继续。 我认为这是因为我的执行器只有 4 个线程,并且 CompletableFuture 需要一些时间。 -s 添加到助焊剂中。

为什么不all(..)take(8)等待通量完成? 我怎样才能让它等待?

代码:

    Mono
    .fromFuture(dbUtil.getEntity(id))
    .doOnError(t -> {
        ...
        return;})
    .doOnSuccess(s -> log.info("Got it: " + s))
    .flatMap( s -> 
        Flux.fromIterable(s.getItemsMap().entrySet())
            .map( e -> CompletableFuture.supplyAsync(()->process(e, s), EXECUTOR))
            .take(s.getItemsMap().entrySet().size()) 
    )
    .all(...)
    .consume(b -> done(b));

最佳答案

如果你想并行计算每个项目,你可以使用 flatMap+just+subscribeOn+map。我对 Reactor 的调度程序类型不太熟悉,所以我给你一个 RxJava 中的例子:

ExecutorService exec = ...
Scheduler scheduler = Schedulers.from(exec);

Observable.from(...)
.flatMap(e -> Observable.just(e).subscribeOn(scheduler).map(v -> process(v)))
.subscribe(...);

关于java - Flux.take(x) 或 Flux.all(..) 不会等待所有排放,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36166339/

相关文章:

java - 多个线程迭代同一个 map

java - 如何在循环中声明标签?

java - 用于字段验证的 REST API

java - 将 json 转换为对象

spring-webflux - 等待多个 Spring WebClient Mono 响应

Spring MVC 到 Spring Webflux 迁移 - block 与订阅

android - RxAndroid : Create Simple Hot Observable

android - RxJava2 Android Schedulers.io() 和 Schedulers.newThread()

java - RXJava逻辑图链

java - react 流 : How to wait for all publishers, 按 key ?