我有一个用例,我需要聚合来自多个 Observable 对象的已完成线程响应并返回给客户端。我的问题是如何使用 rX Java 来实现它。这里我写了一个代码片段,但这个代码片段的问题是超时后不会返回任何内容。
Observable<AggregateResponse> aggregateResponse = Observable.
zip(callServiceA(endpoint), callServiceB(endpoint), callServiceC(endpoint),
(Mashup resultA, Mashup resultB, Mashup resultC) -> {
AggregateResponse result = new AggregateResponse();
result.setResult(resultA.getName() + " " + resultB.getName() + " " + resultC.getName());
return result;
}).timeout(5, TimeUnit.SECONDS);
订阅者
aggregateResponse.subscribe(new Subscriber<AggregateResponse>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
//Timeout execute this rather than aggregating the finished tasks
System.out.println(throwable.getMessage());
System.out.println(throwable.getClass());
}
@Override
public void onNext(AggregateResponse response) {
asyncResponse.resume(response);
}
});
最佳答案
你需要在每个Observable
上放置timeout
运算符,zip
将等待所有Observables发出一个值,然后再发出结果,因此,如果只有其中一个需要更长的时间,而其他的已经发出,则在压缩的 Observable 有机会发出之前,您将使用 timeout
(使用 onError
)来减少流.
假设您想忽略超时源,同时保留其余部分,您应该做的是向每个 Observable 添加超时运算符,并添加错误处理,例如 onErrorReturn对于每一个,错误返回都可以返回某种“空”结果(在 RxJava2 中不能使用 null),并且当聚合结果时忽略这些空结果:
Observable<AggregateResponse> aggregateResponse = Observable.
zip(callServiceA(endpoint)
.timeout(5, TimeUnit.SECONDS)
.onErrorReturn(throwable -> new Mashup()),
callServiceB(endpoint)
.timeout(5, TimeUnit.SECONDS)
.onErrorReturn(throwable -> new Mashup()),
callServiceC(endpoint)
.timeout(5, TimeUnit.SECONDS)
.onErrorReturn(throwable -> new Mashup()),
(Mashup resultA, Mashup resultB, Mashup resultC) -> {
AggregateResponse result = new AggregateResponse();
result.setResult(resultA.getName() + " " + resultB.getName() + " " + resultC.getName());
return result;
});
关于java - 聚合已完成的线程并在超时后发送响应 rX Java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45604378/