我有一个由 Futures 表示的异步任务,在一个单独的线程池中执行,我想使用 RxJava 加入该线程池。使用 Java 5 构造的“旧”方法如下(省略收集结果):
final Future<Response> future1 = wsClient.callAsync();
final Future<Response> future2 = wsClient.callAsync();
final Future<Response> future3 = wsClient.callAsync();
final Future<Response> future4 = wsClient.callAsync();
future1.get();
future2.get();
future3.get();
future4.get();
这会阻塞我当前的线程,直到所有 future 都完成,但调用将是并行的,整个操作只会花费与最长调用相同的时间。
我想使用 RxJava 做同样的事情,但在如何正确建模方面我有点菜鸟。
我尝试了以下方法,它似乎有效:
Observable.from(Arrays.asList(1,2,3,4))
.flatMap(n -> Observable.from(wsClient.callAsync(), Schedulers.io()))
.toList()
.toBlocking()
.single();
这种方法的问题是我引入了 Schedulers.io 线程池,这会导致不必要的线程切换,因为我已经阻塞了当前线程(使用 toBlocking())。 有什么方法可以对 Rx 流程进行建模以并行执行任务,并阻塞直到所有任务完成?
最佳答案
您应该使用zip
功能。
例如这样:
Observable.zip(
Observable.from(wsClient.callAsync(), Schedulers.io()),
Observable.from(wsClient.callAsync(), Schedulers.io()),
Observable.from(wsClient.callAsync(), Schedulers.io()),
Observable.from(wsClient.callAsync(), Schedulers.io()),
(response1, response2, response3, response4) -> {
// This is a zipping function...
// You'll end up here when you've got all responses
// Do what you want with them and return a combined result
// ...
return null; //combined result instead of null
})
.subscribe(combinedResult -> {
// Use the combined result
});
Observable.zip
还可以与 Iterable
一起使用,因此您可以包装 Observable.from(wsClient.callAsync(), Schedulers.io()) ;
周围有一个(返回其中的 4
)。
关于java - 使用RxJava异步调用多个同步任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35627010/