java - RxJava 异步订阅

标签 java multithreading asynchronous system.reactive rx-java

我有一个任务列表,应该在一个新线程中一个一个地处理,然后结果应该由某个主线程在一个方法中显示。 然而这似乎不起作用,flatMap 方法在主线程中被调用。

在这种情况下,为什么 subscribeOn 方法不处理“线程切换”?

在另一个线程中执行某些工作的更好模式是什么? (除了使用 Observable.create 和手动创建一个新线程,这是非常冗长的)

List<Task> tasks = ...;
Observable.from(tasks)
          .flatMap(task -> {
                // should be handled in a new thread
                try {
                    return Observable.just(task.call());
                } catch (Exception e) {
                    log.error("Error", e);
                }

                return Observable.empty();
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(MySchedulers.main())
            .subscribe(this::show); // subscribe called from main thread

最佳答案

警告:我不是 Java 程序员,而是 C#,所以所有奇怪的驼峰式方法名称都让我感到害怕和困惑。

如果您希望为 flatMap 操作创建一个新线程,则 subscribeOn 位于错误的位置。将其插入 fromflatMap 之间。参见 this answer subscribeOnobserveOn 的完整解释 - 它是为 .NET 编写的,但原理是相同的。

我不熟悉 Java 中的任务,所以我不确定您的 Task 是否像 .NET 的 Task 以及 task.call( ) 是异步的并启动自己的线程 - 我猜不是你的问题,因为你说“......应该在 a 新线程中逐一处理的任务列表” .

newThread 调度器每个订阅者 使用一个新线程 - 因为 flatMap 将进行单个订阅,所有 task.call 调用将在同一线程上进行,尽管这将来自 from 运算符的线程。

如果 task.call 实际上是异步的,那么结果将根据它引入的并发性返回,这将独立于 Rx 的语义。

无论哪种方式,(正确放置的)observeOn 都会将结果传递给主线程上的 this::show

关于java - RxJava 异步订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28262665/

相关文章:

c# - 如何清理 MonoTouch 中的线程?

multithreading - TEventLog 组件线程安全吗?

spring-mvc - Spring MVC 的 @Async、DeferredResult 和 Callable 的区别

asp.net-mvc - 异步 MVC 操作的工作线程从何而来?

java - 为什么要乘而不是求和

java - 对对象数组列表中的整数进行排序

java - 为什么 en_GB 语言环境认为 1 月 1 日是一年中的第 52 周?

java - 如何获取对象列表的点类?

python - 提升 python 服务器的性能

javascript - 在 Node.JS/NPM 的 MySQL 库中使用带有查询的 Promise