java - rxjava2 - 在线程池上执行任务的简单示例,在单个线程上订阅

标签 java kotlin rx-java2

我正在尝试以下任务来了解 RxJava:

  • 给定一个 URL 列表
  • 对线程池中的每个 URL 进行 HTTP 请求
  • 对于每个结果,将一些数据插入 SQLite 数据库(这里没有多线程)
  • 阻塞该方法直到它完成

所以我在 Kotlin 中尝试了一下:

val ex = Executors.newFixedThreadPool(10)
Observable.fromIterable((1..100).toList())
    .observeOn(Schedulers.from(ex))
    .map { Thread.currentThread().name }
    .subscribe { println(it + " " + Thread.currentThread().name }

我希望它能打印出来

pool-1-thread-1 main
pool-1-thread-2 main
pool-1-thread-3 main
pool-1-thread-4 main
....

无论如何打印:

pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1

谁能纠正我对它的工作原理的误解?为什么它不使用线程池的所有线程?如何让订阅者在主线程上运行或阻塞直到完成?

最佳答案

Rx 并不意味着并行执行服务,为此使用 Java 的流 api。 Rx 事件是同步的,随后会流过流。在构建流时,observeOn 将请求一个线程一次,并在该线程上一个一个地处理发射。

您还期望 subscribe 在主线程上执行。 observeOn 切换线程,所有下游事件都发生在该线程上。如果要切换到主线程,则必须在 subscribe 之前插入另一个 observeOn

关于java - rxjava2 - 在线程池上执行任务的简单示例,在单个线程上订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46027973/

相关文章:

java - 进程循环运行而不是每次都记录

Java 动态类在 webapp 中加载

kotlin - 高阶函数作为枚举参数

android - 在使用 kotlin 的 android studio 中出现错误

java - 如何在 Maybe.create(emitter) 中发出 Maybe.empty ?

java - 单击按钮后将对象添加到 JPanel

java - 根据平台放置程序图标

Android Jetpack Compose - 图像无法缩放到框的宽度和高度

java - RxJava 使用优化请求

android - RxJava2 结合不同类型的 Observable