我刚刚开始使用 RxJava。我一直在尝试构建一个数据管道,从不同来源下载大量数据并以并发方式将数据插入数据库。
我的基本管道形式如下所示:
Observable.range(1, 5)
.concatMap((i) -> {
return Observable.range(i, 2);
})
.concatMap((i) -> {
return Observable.range(i, 2);
})
.subscribe((i) -> { System.out.println(i); }, System.out::println,() -> { System.out.println("Complete"); });
每当我调用observeOn而不是运行并打印出上面打印出的所有数字时,什么都不会打印出来。为什么是这样?我希望下一个 concatMap 和 subscribe 也将只使用计算调度程序。代码贴在下面。
Observable.range(1, 5)
.concatMap((i) -> {
return Observable.range(i, 2);
})
.observeOn(Schedulers.computation())
.concatMap((i) -> {
return Observable.range(i, 2);
})
.subscribe((i) -> { System.out.println(i); }, System.out::println,() -> { System.out.println("Complete"); });
最佳答案
这是一个猜测,因为您没有提供上下文,但是如果您要更改线程,则必须阻塞,因为主线程没有被阻塞,并且您可能在其他调度程序有机会运行之前终止:
Observable.range(1, 5)
.concatMap((i) -> {
return Observable.range(i, 2);
})
.observeOn(Schedulers.computation())
.concatMap((i) -> {
return Observable.range(i, 2);
})
.subscribe((i) -> { System.out.println(i); }, System.out::println,() -> { System.out.println("Complete"); });
// block to let other schedulers finish
Thread.sleep(3000);
关于java - 使 RxJava 运算符链并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34804376/