java - 为什么我的 RxJava observable 没有触发订阅者?

标签 java multithreading asynchronous reactive-programming rx-java

我正在使用 RxJava,我想流式传输一千个连续的整数。然后我想把它们异步拆分成奇数流和偶数流,然后异步打印出来。

但是,我没有打印出任何内容,或者至少只有非常部分的输出。我错过了什么?我的时间安排有误吗?还是控制台在 Eclipse 中存在多线程问题?

public static void main(String[] args)  {

    List<Integer> values = IntStream.range(0,1000).mapToObj(i -> Integer.valueOf(i)).collect(Collectors.toList());

    Observable<Integer> ints = Observable.from(values).subscribeOn(Schedulers.computation());

    Observable<Integer> evens = ints.filter(i -> Math.abs(i) % 2 == 0);
    Observable<Integer> odds = ints.filter(i -> Math.abs(i) % 2 != 0);

    evens.subscribe(i -> System.out.println(i + " IS EVEN " + Thread.currentThread().getName()));

    odds.subscribe(i -> System.out.println(i + " IS ODD " + Thread.currentThread().getName()));
}

最佳答案

您正在使用运行守护线程的 Schedules.computation 启动处理管道。因此,当您的 main 线程结束时,这些线程会在处理您的可观察对象之前终止。

因此,如果您希望看到打印的结果,您可以让主线程等待结果(例如通过 Thread.sleep)或通过删除 subscribeOn< 订阅调用线程。还有一个选项可以创建您自己的调度程序,它将运行非守护线程。

关于java - 为什么我的 RxJava observable 没有触发订阅者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30037306/

相关文章:

java - 如何从用户读取字符串直到遇到换行符(Java)?

javascript - 检查变量是否为 "async function"类型

c - 在 C 中编写异步事件服务器的推荐模式

java - 如何进入无限循环直到输入有效数据?

java - 是否有像 "must avoid"这样的 javadoc 标签?

Android fragment 相互重叠

java - 程序在executeBatch上挂起

multithreading - Perl:特殊变量线程安全吗?

asynchronous - 预期为 `async` block ,发现了不同的 `async` block

java - 在 Spring Batch 中动态设置 gridSize(线程数)