我必须进行 N 次 REST API 调用并合并所有调用的结果,或者如果至少有一个调用失败(返回错误或超时)则失败。
我想使用 RxJava 并且我有一些要求:
如果我想用单个线程发出所有请求,我需要一个异步 Http 客户端,不是吗?
谢谢。
最佳答案
您可以使用 Zip
运算符(operator)在请求结束后将所有请求压缩在一起,并在那里检查是否所有请求都成功
private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;
/**
* Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel.
* By default Rx is not async, only if you explicitly use subscribeOn.
*/
@Test
public void testAsyncZip() {
scheduler = Schedulers.newThread();
scheduler1 = Schedulers.newThread();
scheduler2 = Schedulers.newThread();
long start = System.currentTimeMillis();
Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(result -> showResult("Async in:", start, result));
}
private Observable<String> obAsyncString() {
return Observable.just("Request1")
.observeOn(scheduler)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
private Observable<String> obAsyncString1() {
return Observable.just("Request2")
.observeOn(scheduler1)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
private Observable<String> obAsyncString2() {
return Observable.just("Request3")
.observeOn(scheduler2)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}
在这个例子中,我们只是连接结果,但不是这样做,你可以检查结果并在那里执行你的业务逻辑。
您也可以考虑
merge
或 contact
还。您可以在这里查看更多示例 https://github.com/politrons/reactive
关于asynchronous - 如何使用 RxJava 发出多个 API 请求并将它们组合起来?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45448690/