我正在尝试以下任务来了解 RxJava:
- 给定一个 URL 列表
- 对线程池中的每个 URL 进行 HTTP 请求
- 对于每个结果,将一些数据插入 SQLite 数据库(这里没有多线程)
- 阻塞该方法直到它完成
所以我在 Kotlin 中尝试了一下:
val ex = Executors.newFixedThreadPool(10)
Observable.fromIterable((1..100).toList())
.observeOn(Schedulers.from(ex))
.map { Thread.currentThread().name }
.subscribe { println(it + " " + Thread.currentThread().name }
我希望它能打印出来
pool-1-thread-1 main
pool-1-thread-2 main
pool-1-thread-3 main
pool-1-thread-4 main
....
无论如何打印:
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
谁能纠正我对它的工作原理的误解?为什么它不使用线程池的所有线程?如何让订阅者在主线程上运行或阻塞直到完成?
最佳答案
Rx 并不意味着并行执行服务,为此使用 Java 的流 api。 Rx 事件是同步的,随后会流过流。在构建流时,observeOn 将请求一个线程一次,并在该线程上一个一个地处理发射。
您还期望 subscribe
在主线程上执行。 observeOn
切换线程,所有下游事件都发生在该线程上。如果要切换到主线程,则必须在 subscribe
之前插入另一个 observeOn
。
关于java - rxjava2 - 在线程池上执行任务的简单示例,在单个线程上订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46027973/