asynchronous - 如何使用 RxJava 发出多个 API 请求并将它们组合起来?

标签 asynchronous rx-java observable nonblocking rx-java2

我必须进行 N 次 REST API 调用并合并所有调用的结果,或者如果至少有一个调用失败(返回错误或超时)则失败。
我想使用 RxJava 并且我有一些要求:

  • 能够在某些情况下配置每个单独的 api 调用的重试。我的意思是,如果我有一次重试 = 2 并且我发出 3 个请求,每个请求最多重试 2 次,总共最多 6 个请求。
  • 快速失败!如果一个 API 调用失败了 N 次(其中 N 是重试的配置),其余请求是否尚未结束并不重要,我想返回一个错误。

  • 如果我想用单个线程发出所有请求,我需要一个异步 Http 客户端,不是吗?

    谢谢。

    最佳答案

    您可以使用 Zip运算符(operator)在请求结束后将所有请求压缩在一起,并在那里检查是否所有请求都成功

     private Scheduler scheduler;
    private Scheduler scheduler1;
    private Scheduler scheduler2;
    
    /**
     * Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel.
     * By default Rx is not async, only if you explicitly use subscribeOn.
     */
    @Test
    public void testAsyncZip() {
        scheduler = Schedulers.newThread();
        scheduler1 = Schedulers.newThread();
        scheduler2 = Schedulers.newThread();
        long start = System.currentTimeMillis();
        Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
                .concat(s3))
                .subscribe(result -> showResult("Async in:", start, result));
    }
    
    private Observable<String> obAsyncString() {
        return Observable.just("Request1")
                .observeOn(scheduler)
                .doOnNext(val -> {
                    System.out.println("Thread " + Thread.currentThread()
                            .getName());
                })
                .map(val -> "Hello");
    }
    
    private Observable<String> obAsyncString1() {
        return Observable.just("Request2")
                .observeOn(scheduler1)
                .doOnNext(val -> {
                    System.out.println("Thread " + Thread.currentThread()
                            .getName());
                })
                .map(val -> " World");
    }
    
    private Observable<String> obAsyncString2() {
        return Observable.just("Request3")
                .observeOn(scheduler2)
                .doOnNext(val -> {
                    System.out.println("Thread " + Thread.currentThread()
                            .getName());
                })
                .map(val -> "!");
    }
    

    在这个例子中,我们只是连接结果,但不是这样做,你可以检查结果并在那里执行你的业务逻辑。

    您也可以考虑mergecontact还。

    您可以在这里查看更多示例 https://github.com/politrons/reactive

    关于asynchronous - 如何使用 RxJava 发出多个 API 请求并将它们组合起来?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45448690/

    相关文章:

    javascript - $.when.apply($, someArray) 是做什么的?

    java - 使用 Jersey 在运行时决定同步或异步响应

    javascript - NodeJS 异步系列不按顺序工作

    java - RxJava : chaining results from map methods

    rx-java - RxJava 中错误恢复时如何处理发散流?

    c# - asp .net MVC 5 中的异步任务

    java - 如何使一个 Observable 发出由 RxJava 2 中另一个 Observable 发出的组合项?

    kotlin - RxJava可观察到错误,可从数据库中获取项目,但仍通过传递错误

    javascript - 如何在 Ionic 3 中搜索数组项?

    javascript - RxJS 在分区子可观察对象之间共享父可观察对象