project-reactor - 为什么Thread.sleep()会触发对Flux.interval()的订阅?

标签 project-reactor

如果,在 main()方法,我执行这个

Flux.just(1,2)
    .log()
    .subscribe();

我在控制台中得到这个:

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onComplete()

如果代替just()我用interval()方法:

Flux.interval(Duration.ofMillis(100))
    .take(2)
    .log()
    .subscribe();

这些元素不会被记录,除非我添加 Thread.sleep()这给了我:

[ INFO] (main) onSubscribe(FluxTake.TakeSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(0)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-1) onComplete()

问题是:为什么我需要暂停线程才能真正触发订阅?

最佳答案

您需要等待主线程并让执行完成。您的主线程在生成下一个元素之前终止。您的第一个元素是在100 ms后生成的,因此您需要等待/阻止主线程。试试这个:

CountDownLatch latch = new CountDownLatch(1);
Flux.interval(Duration.ofMillis(100))
        .take(2)
        .doOnComplete(latch::countDown)
        .log()
        .subscribe();
latch.await(); // wait for doOnComplete

CountDownLatch :

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

关于project-reactor - 为什么Thread.sleep()会触发对Flux.interval()的订阅?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61767404/

相关文章:

java - 使用 WebFlux 对特定映射进行并行 GET 请求

spring-webflux - 因为我的基本URI不固定,所以在Webflux中一次又一次地创建Webclient是否明智?

java - 方法引用 : cannot convert reactor. core.publisher.Mono<S> 到reactor.core.publisher.Mono<? 中的返回类型错误延伸R>

java - 如何等待多个 Flux 和 Mono 发布者同时完成

java - flatMap如何管理线程?

java - 无法通过@PathVariable绑定(bind)到Mono<String>?

spring - 使用cron在Spring中安排周期性的 react 任务?

spring-boot - Spring Boot 2 和带有反应器的 logback MDC

java - 将传统消息生成/队列转换为 react 器中的通量

java - WebFlux : how to work takeUntilOther() method?