如果,在 main()
方法,我执行这个
Flux.just(1,2)
.log()
.subscribe();
我在控制台中得到这个:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onComplete()
如果代替just()
我用interval()
方法:
Flux.interval(Duration.ofMillis(100))
.take(2)
.log()
.subscribe();
这些元素不会被记录,除非我添加 Thread.sleep()
这给了我:
[ INFO] (main) onSubscribe(FluxTake.TakeSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) onNext(0)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-1) onComplete()
问题是:为什么我需要暂停线程才能真正触发订阅?
最佳答案
您需要等待主线程并让执行完成。您的主线程在生成下一个元素之前终止。您的第一个元素是在100 ms
后生成的,因此您需要等待/阻止主线程。试试这个:
CountDownLatch latch = new CountDownLatch(1);
Flux.interval(Duration.ofMillis(100))
.take(2)
.doOnComplete(latch::countDown)
.log()
.subscribe();
latch.await(); // wait for doOnComplete
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
关于project-reactor - 为什么Thread.sleep()会触发对Flux.interval()的订阅?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61767404/