我的要求:
- N 个 Retrofit 并行调用
- 等待所有调用完成(成功或失败)
- 如果 k (0<= k < N) 次调用失败,它们不应阻止其他调用。想象一下,失败的调用可以返回
null
或错误对象,而成功的调用返回ResponseBody
对象。 - 可选:如果 RxJava 有一种简单的方法来区分每次成功或失败的调用,那很好,如果没有,我将解析响应并自己弄清楚
我有什么:
Observable<ResponseBody> api1Call = api1.fetchData();
Observable<ResponseBody> api2Call = api2.fetchData();
Observable<ResponseBody> api3Call = api3.fetchData();
Observable.combineLatest(api1Call, api2Call, api3Call, new Func2<ResponseBody, ResponseBody, ResponseBody, Object>() {
@Override
public Object call(ResponseBody responseBody1, ResponseBody responseBody2, ResponseBody responseBody3) {
Logger.i("what does this do? - '%s', '%s', '%s'", responseBody1, responseBody2, responseBody3);
return null;
}
}).onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
Logger.e(throwable, "some error with one of the apis?");
return Observable.empty();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
Logger.i("onCompleted");
}
@Override
public void onError(Throwable e) {
Logger.e(e, "onError");
}
@Override
public void onNext(Object o) {
Logger.i("onNext " + o);
}
});
我得到的输出:
some error with one of the apis?
// stacktrace of the error
onCompleted
我是 RxJava 的新手,非常困惑。我在 StackOverflow 上找到了一些答案,说 zip
做了类似的事情,但它离我的要求更远。我猜其中一个“组合”运算符 + 适当的异常处理将满足我的需要。到目前为止真的很难弄明白
我使用的版本:
compile 'io.reactivex:rxjava:1.3.0'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'
最佳答案
您无法通过
combineLast
或zip
实现并行,rxjava
将在我的测试中按顺序执行和发出您的项目。如果您的任务之一失败,您的
Func2#call
将不会被调用,而是提交onError
。你甚至无法通过这种方式获得其他成功任务的结果。解决方案是
flatMap
,这是rxjava
中实现并发的传统方式。它还满足您的其他要求。
这是一个小而完整的例子。
我使用一个简单的网站服务来测试。
我用一个Semaphore
来等待所有任务完成,你完全可以忽略它。并且我在 http 请求中添加了日志记录以便更好地理解,您也可以完全忽略它。
public interface WebsiteService {
@GET
Observable<ResponseBody> website(@Url String url);
}
然后我用下面的代码用rxjava
测试结果。
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
Retrofit retrofit = new Retrofit.Builder().baseUrl("https://www.google.com")
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.client(new OkHttpClient.Builder().addInterceptor(loggingInterceptor).build())
.build();
WebsiteService websiteService = retrofit.create(WebsiteService.class);
final Semaphore s = new Semaphore(1);
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
Observable<ResponseBody> first = websiteService.website("http://github.com");
Observable<ResponseBody> second = websiteService.website("http://stackoverflow.com");
Observable<ResponseBody> third = websiteService.website("http://notexisting.com");
final int numberOfCalls = 3; // testing for three calls
Observable.just(first, second, third)
.flatMap(new Function<Observable<ResponseBody>, ObservableSource<ResponseBody>>() {
@Override
public ObservableSource<ResponseBody> apply(@NonNull Observable<ResponseBody> responseBodyObservable) throws Exception {
return responseBodyObservable.subscribeOn(Schedulers.computation());
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<ResponseBody>() {
private int currentDoneCalls = 0;
private void checkShouldReleaseSemaphore() {
if (currentDoneCalls >= numberOfCalls) {
s.release();
}
}
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull ResponseBody responseBody) {
System.out.println("Retrofit call success " + responseBody.contentType());
synchronized (this) {
currentDoneCalls++;
}
checkShouldReleaseSemaphore();
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("Retrofit call failed " + e.getMessage());
synchronized (this) {
currentDoneCalls++;
}
checkShouldReleaseSemaphore();
}
@Override
public void onComplete() {
System.out.println("onComplete, All request success");
checkShouldReleaseSemaphore();
}
});
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("All request done");
s.release();
}
我使用 rxjava2
并改造 adapter-rxjava2
进行测试。
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.1'
已更新
RxJava2的介绍页来自github指出了实现并行的实用方法。
Practically, paralellism in RxJava means running independent flows and merging their results back into a single flow. The operator
flatMap
does this...
虽然这个例子是基于RxJava2
,但是操作flatMap
是
已经存在于 RxJava
中。
关于android - 如何在错误情况下非阻塞地组合多个 RxJava 链,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44643106/