java - RxJava2 - 来自 concat 的最后一个成功的 Single

标签 java observable reactive-programming rx-java2

我有 N 个单曲源,我想将这些源连接起来并获得最新的成功单曲。例如:

数据库单:

--[A]--|->

API 单一:

---X--->

Single.concat(数据库,API):

--[A]--|->

否则:

数据库单:

--[A]--|->

API 单一:

--[B]--|->

Single.concat(数据库,API):

--[B]--|->

这可能吗?我阅读了文档,但没有找到类似“lastSuccessfullOrError()”方法的内容。我尝试了“elementAt”、“lastOrError”等,但它们的行为不是我想要的

谢谢

最佳答案

这可以通过以下方式实现:

public static <T> Single<T> latestSuccess(Single<T>... sources) {
     return Single.defer(() -> {
         AtomicReference<T> last = new AtomicReference<T>();
         return Observable.fromArray(sources)
             .concatMap(source ->
                  source.doOnSuccess(last::lazySet)
                  .toObservable()
                  .onErrorResumeNext(Observable.empty())
             )
             .ignoreElements()
             .andThen(Single.fromCallable(() -> {
                 if (last.get() == null) {
                     throw new NoSuchElementException();
                 }
                 return last.get();
             }));
     });
}

关于java - RxJava2 - 来自 concat 的最后一个成功的 Single,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48508391/

相关文章:

Java/Eclipse - 确定可能的异常而不强制它们?

java - 通过 map 流进行过滤并收集到单个列表中?

javascript - Angular 2 : Synching two post requests

Java - 响应式(Reactive)编程与事件监听器

java - 处理中的 OpenCV pyrMeanShiftFilter——矩阵问题

java - MySQL 服务器 5.1.72 的正确 JDBC 版本是什么?

java - mockito - 伪造 addObserver

angular - 如何在订阅期间检查 Angular 5 RxJS Observable 的 NULL 结果?

system.reactive - 使用Rx运行直方图流

android - 在 RxJava 中的 TimeoutException 上恢复 Flowable