rx-java - 具有优先级的 RxJava Zip

标签 rx-java reactive-programming

问题就在这里。我有许多异步操作,其结果被聚合成一个并进一步处理的操作。但是,并非所有操作都是相同的,并且根据失败的操作,错误处理也有所不同。

详细来说,假设我们有操作 A、B 和 C。如果 A 失败,我们需要结束处理,但如果 B 或 C 失败,我们继续正常处理其他操作。

目前,我们使用倒计时锁存器和大量状态管理来实现这一目标,需要近一百行代码。我想将其转移到基于 RxJava 的实现。我的第一个想法是尝试 Observable.zip 运算符,但这将所有可观察值视为相等,但在本例中并非如此。我的另一个想法是链接调用,这是可行的,但这意味着所有操作不会同时启动,从而导致整体时间更长。

谁能指导我如何实现这一目标?

最佳答案

使用.onErrorResumeNext:

Observable<T> a, b, c;
Observable.zip(
    a,
    b.onErrorResumeNext(t -> Observable.just(null)),
    c.onErrorResumeNext(t -> Observable.just(null)),
   (x, y, z) -> <your aggregation>)
...

null 表示错误的可观察值由您决定。您还可以使用可选:

Observable.zip(
    a,
    b.map(x -> Optional.of(x))
     .onErrorResumeNext(t -> Observable.just(Optional.empty())),
    c.map(x -> Optional.of(x))
     .onErrorResumeNext(t -> Observable.just(Optional.empty())),
   (x, y, z) -> <your aggregation>)

如果 bc 对应于外部服务调用,如果它们花费的时间太长,您可能还希望通过将 b 替换为 b.timeout(5, TimeUnit.SECONDS)。简洁啊!

关于rx-java - 具有优先级的 RxJava Zip,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38576106/

相关文章:

java - StringObservable.from(InputStream).share() 立即导致 MissingBackPressure

kotlin - 如何使用重试只有3次就放弃

java - 使用 RxJava 链两个改造 observables

angular - 如何将 Observable 数组合并为具有相同返回类型的单个 Observable

javascript - next() 到中间 Observable

retrofit - 是否可以同步运行可观察的改造?

rx-java - Kotlin通配符方法

java - 聚合所有发出的流并使用 rxJava 立即发出它们

javascript - 如何观察对象的方法返回的属性变化

javascript - 用响应式编程重写事件发射器(信号量示例)