问题就在这里。我有许多异步操作,其结果被聚合成一个并进一步处理的操作。但是,并非所有操作都是相同的,并且根据失败的操作,错误处理也有所不同。
详细来说,假设我们有操作 A、B 和 C。如果 A 失败,我们需要结束处理,但如果 B 或 C 失败,我们继续正常处理其他操作。
目前,我们使用倒计时锁存器和大量状态管理来实现这一目标,需要近一百行代码。我想将其转移到基于 RxJava 的实现。我的第一个想法是尝试 Observable.zip 运算符,但这将所有可观察值视为相等,但在本例中并非如此。我的另一个想法是链接调用,这是可行的,但这意味着所有操作不会同时启动,从而导致整体时间更长。
谁能指导我如何实现这一目标?
最佳答案
使用.onErrorResumeNext
:
Observable<T> a, b, c;
Observable.zip(
a,
b.onErrorResumeNext(t -> Observable.just(null)),
c.onErrorResumeNext(t -> Observable.just(null)),
(x, y, z) -> <your aggregation>)
...
用 null
表示错误的可观察值由您决定。您还可以使用可选
:
Observable.zip(
a,
b.map(x -> Optional.of(x))
.onErrorResumeNext(t -> Observable.just(Optional.empty())),
c.map(x -> Optional.of(x))
.onErrorResumeNext(t -> Observable.just(Optional.empty())),
(x, y, z) -> <your aggregation>)
如果 b
和 c
对应于外部服务调用,如果它们花费的时间太长,您可能还希望通过将 b
替换为 b.timeout(5, TimeUnit.SECONDS)
。简洁啊!
关于rx-java - 具有优先级的 RxJava Zip,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38576106/