我对 RxJava 比较陌生,并且我已经使用运算符有一段时间了。
我看到这个小例子,它在短时间间隔(1s)后发出项目:
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
return d + " " + t;
}).toBlocking().forEach(System.out::println);
这是可行的,但是当我删除将源转换为 BlockingObservable 的 toBlocking()
时,程序会执行并以没有输出的方式结束。
我通常会查看弹珠图来正确理解事物: http://reactivex.io/documentation/operators/zip.html
在最后一句中它说:它只会发射与发射最少项目的源 Observable 发射的项目数量一样多的项目。
这是否意味着,data
Observable 在不到 1 秒的时间内发出所有项目,并在打印每个 Observable 的前两个项目之前结束?因为每个 Observable 本身都是异步的?
我需要清楚地了解正在发生的事情,以及是否有其他方法来处理类似的情况。有人吗?
最佳答案
Observable.interval
在幕后使用 Scheduler
。它将从另一个线程发出。同时,主线程已完成所有编写并将退出。大概您的 main
方法中有此代码,这就是您的程序退出的原因。
在真实系统中,这不应该成为问题(除非您的真实系统是包含此代码的 main
方法)。
在示例程序中,您可以通过从 stdin 读取一个字节来导致主线程阻塞。像这样的事情:
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> d + " " + t)
.subscribe(System.out::println);
System.in.read();
关于java - Rx(Java 响应式(Reactive)扩展)具有时间间隔的 Zip 运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29980640/