java - Rx(Java 响应式(Reactive)扩展)具有时间间隔的 Zip 运算符

标签 java multithreading reactive-programming rx-java

我对 RxJava 比较陌生,并且我已经使用运算符有一段时间了。

我看到这个小例子,它在短时间间隔(1s)后发出项目:

Observable<String> data = Observable.just("one", "two", "three", "four", "five");
    Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
        return d + " " + t;
    }).toBlocking().forEach(System.out::println);

这是可行的,但是当我删除将源转换为 BlockingObservable 的 toBlocking() 时,程序会执行并以没有输出的方式结束。

我通常会查看弹珠图来正确理解事物: http://reactivex.io/documentation/operators/zip.html

在最后一句中它说:它只会发射与发射最少项目的源 Observable 发射的项目数量一样多的项目。

这是否意味着,data Observable 在不到 1 秒的时间内发出所有项目,并在打印每个 Observable 的前两个项目之前结束?因为每个 Observable 本身都是异步的?

我需要清楚地了解正在发生的事情,以及是否有其他方法来处理类似的情况。有人吗?

最佳答案

Observable.interval 在幕后使用 Scheduler。它将从另一个线程发出。同时,主线程已完成所有编写并将退出。大概您的 main 方法中有此代码,这就是您的程序退出的原因。

在真实系统中,这不应该成为问题(除非您的真实系统是包含此代码的 main 方法)。

在示例程序中,您可以通过从 stdin 读取一个字节来导致主线程阻塞。像这样的事情:

Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> d + " " + t)
        .subscribe(System.out::println);

System.in.read();

关于java - Rx(Java 响应式(Reactive)扩展)具有时间间隔的 Zip 运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29980640/

相关文章:

java - 以编程方式找出有多少对象符合垃圾收集条件?

java - 设置 Eclipse 不在接口(interface)中的方法声明之间插入空行?

java - 使用 JSTL 循环

java - 线程和同步问题

c# - 关于任务延续的问题

java - 等待ParallelFlux完成

java 将句子拆分为单词

c++ - 等待多个 future ?

swift - 将模型与裸 RxSwift 和 BehaviourSubject 同步

java - RxJava : How to prepend startWith() default emit EVERY TIME parent observable emits?