我是 RxJava 新手。我有一些返回 Observables 的 Jersey RxJava 客户端。我需要进行一次调用来获取一些数据,该数据将成为我接下来 3 次调用的输入。我希望这些电话能够并行进行。最后,我想在所有需要所有数据的调用完成后进行计算。其外观如下:
interface Service {
Observable<ResultA> callServiceA(InitialData input);
Observable<ResultB> callServiceB(ResultA resultA);
Observable<ResultC> callServiceC(ResultA resultA);
Observable<ResultD> callServiceD(ResultA resultA);
FinalResult simpleCalculation(ResultA a, ResultB b, ResultC c, ResultD d);
}
class MyClass{
@Autowired
ExecutorService myExecutorService;
Observable<FinalResult> myMethod(InitialData initialData){
/* Make call to ServiceA, get the results, then make calls to services B, C, and D in parallel (on different threads), finally perform simpleCalculation, and emit the result */
}
}
最佳答案
flatMap()
和 zip()
在这种情况下是你的 friend 。
Observable<FinalResult> myMethod(InitialData initialData) {
return service
.callServiceA(initialData)
.flatMap(resultA -> Observable.zip(
service.callServiceB(resultA),
service.callServiceC(resultA),
service.callServiceD(resultA),
(resultB, resultC, resultD) ->
service.simpleCalculation(resultA, resultB, resultC, resultD))
);
}
使用返回可观察值将如下所示:
Subscription subscription =
myMethod(new InitialData())
.subscribe(finalResult -> {
// FinalResult will end up here.
},
throwable -> {
// Handle all errors here.
});
关于java - 如何异步调用多个可观察的调用,同时在这些调用之前和之后同步执行一些计算?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38625025/