java - 使用RxJava异步调用多个同步任务

标签 java rx-java

我有一个由 Futures 表示的异步任务,在一个单独的线程池中执行,我想使用 RxJava 加入该线程池。使用 Java 5 构造的“旧”方法如下(省略收集结果):

final Future<Response> future1 = wsClient.callAsync();
final Future<Response> future2 = wsClient.callAsync();
final Future<Response> future3 = wsClient.callAsync();
final Future<Response> future4 = wsClient.callAsync();

future1.get();
future2.get();
future3.get();
future4.get();

这会阻塞我当前的线程,直到所有 future 都完成,但调用将是并行的,整个操作只会花费与最长调用相同的时间。

我想使用 RxJava 做同样的事情,但在如何正确建模方面我有点菜鸟。

我尝试了以下方法,它似乎有效:

Observable.from(Arrays.asList(1,2,3,4))
            .flatMap(n -> Observable.from(wsClient.callAsync(), Schedulers.io()))
            .toList()
            .toBlocking()
            .single();

这种方法的问题是我引入了 Schedulers.io 线程池,这会导致不必要的线程切换,因为我已经阻塞了当前线程(使用 toBlocking())。 有什么方法可以对 Rx 流程进行建模以并行执行任务,并阻塞直到所有任务完成?

最佳答案

您应该使用zip功能。 例如这样:

Observable.zip(
        Observable.from(wsClient.callAsync(), Schedulers.io()),
        Observable.from(wsClient.callAsync(), Schedulers.io()),
        Observable.from(wsClient.callAsync(), Schedulers.io()),
        Observable.from(wsClient.callAsync(), Schedulers.io()),
        (response1, response2, response3, response4) -> {
            // This is a zipping function...
            // You'll end up here when you've got all responses
            // Do what you want with them and return a combined result
            // ...
            return null; //combined result instead of null
        })
        .subscribe(combinedResult -> {
            // Use the combined result
        });

Observable.zip 还可以与 Iterable 一起使用,因此您可以包装 Observable.from(wsClient.callAsync(), Schedulers.io()) ; 周围有一个(返回其中的 4)。

关于java - 使用RxJava异步调用多个同步任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35627010/

相关文章:

python - RxPy - 为什么排放与合并运算符交错?

android - 如何显示来自 flatMap 运算符的可观察对象的内容

java - 如何编写根据类型执行操作的代码

java - Android 中的 Activity 启动时如何显示所有数据库记录?

java - PNG 图像在 Itext7 中被损坏

android - RxAndroid、事件总线和 Activity 生命周期

java - Spring Security 拦截 url 似乎被跳过/忽略

java - Hibernate 拦截器 - 加载事件后

android - RxTextView 文本更改 + 改造调用导致 InterruptedIOException

java - 如何处理 RxJava 中观察者的 onNext 抛出的异常?