java - 如何异步调用多个可观察的调用,同时在这些调用之前和之后同步执行一些计算?

标签 java rx-java reactive-programming jersey-client reactivex

我是 RxJava 新手。我有一些返回 Observables 的 Jersey RxJava 客户端。我需要进行一次调用来获取一些数据,该数据将成为我接下来 3 次调用的输入。我希望这些电话能够并行进行。最后,我想在所有需要所有数据的调用完成后进行计算。其外观如下:

interface Service {
  Observable<ResultA> callServiceA(InitialData input);
  Observable<ResultB> callServiceB(ResultA resultA);
  Observable<ResultC> callServiceC(ResultA resultA);
  Observable<ResultD> callServiceD(ResultA resultA);
  FinalResult simpleCalculation(ResultA a, ResultB b, ResultC c, ResultD d);
}

class MyClass{

   @Autowired
   ExecutorService myExecutorService;

   Observable<FinalResult> myMethod(InitialData initialData){
   /* Make call to ServiceA, get the results, then make calls to services B, C, and D in parallel (on different threads), finally perform simpleCalculation, and emit the result */
   }
}

最佳答案

flatMap()zip() 在这种情况下是你的 friend 。

Observable<FinalResult> myMethod(InitialData initialData) {
    return service
            .callServiceA(initialData)
            .flatMap(resultA -> Observable.zip(
                    service.callServiceB(resultA),
                    service.callServiceC(resultA),
                    service.callServiceD(resultA),
                    (resultB, resultC, resultD) -> 
                      service.simpleCalculation(resultA, resultB, resultC, resultD))
            );
}

使用返回可观察值将如下所示:

Subscription subscription =
        myMethod(new InitialData())
                .subscribe(finalResult -> {
                            // FinalResult will end up here.
                        },
                        throwable -> {
                            // Handle all errors here.
                        });

关于java - 如何异步调用多个可观察的调用,同时在这些调用之前和之后同步执行一些计算?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38625025/

相关文章:

java - 返回 json 意外,在 Spring 中将 "links"拼写为 "_links"并且结构不同

java - URL 标识符的加密算法

java - 线程本身是否有可能死锁?

java - RxJava - 使用 Single.Error/Observable.error 与抛出异常

rx-java - 递归地将Rx单例组合到Observable中

java - 使用 RxJava -Observables 进行异步调用

rx-java - 为什么在 replay() 之前调用publish()很重要

java - 当我运行 `./gradlew wrapper` 时,Minecraft Forge 帽子导致了此错误?

javascript - Baconjs 永无止境的流

java - Mono/Flux 可以重用/它们是不可变的+无状态的吗?