java - 使 RxJava 运算符链并发

标签 java asynchronous reactive-programming rx-java

我刚刚开始使用 RxJava。我一直在尝试构建一个数据管道,从不同来源下载大量数据并以并发方式将数据插入数据库。

我的基本管道形式如下所示:

        Observable.range(1, 5)
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .subscribe((i) -> { System.out.println(i); }, System.out::println,() -> { System.out.println("Complete"); });

每当我调用observeOn而不是运行并打印出上面打印出的所有数字时,什么都不会打印出来。为什么是这样?我希望下一个 concatMap 和 subscribe 也将只使用计算调度程序。代码贴在下面。

        Observable.range(1, 5)
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .observeOn(Schedulers.computation())
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .subscribe((i) -> { System.out.println(i); }, System.out::println,() -> { System.out.println("Complete"); });

最佳答案

这是一个猜测,因为您没有提供上下文,但是如果您要更改线程,则必须阻塞,因为主线程没有被阻塞,并且您可能在其他调度程序有机会运行之前终止:

Observable.range(1, 5)
        .concatMap((i) -> {
            return Observable.range(i, 2);
        })
        .observeOn(Schedulers.computation())
        .concatMap((i) -> {
            return Observable.range(i, 2);
        })
        .subscribe((i) -> { System.out.println(i); }, System.out::println,() -> { System.out.println("Complete"); });
// block to let other schedulers finish
Thread.sleep(3000);

关于java - 使 RxJava 运算符链并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34804376/

相关文章:

python - 在 WSGI 容器中使用扭曲的词

c# - 如何获取 N 个热 Observable<decimal> 实例的 "last"项的总和?

java - Spring WebFlux switchIfEmpty 返回不同的类型

java - Reactor 2.0 中的多线程 - 为什么我不能将信号输出到多个线程

java - Runtime.getRuntime().exec 中的命令不起作用

java - Android SwipeViews + TitleStrip + ListView

javascript - Meteor/Node.js : Multiple http requests within a for loop, 在时间间隔内均匀分布?

c# - 只初始化一次异步模式

java - 带参数的 Spring mvc 重定向模式

java - 在 swift 中转义闭包以及如何在 Java 中执行它