rx-java - 如何避免 Rx 流中的重复网络调用?

标签 rx-java system.reactive rx-java2

我有以下流。

Observable.just(Unit) // execute immediately
    .mergeWith(tryAgainRelay) // execute again when this relay emits
    .flatMap {
      api.call() // emits sealed class single containing `Success` or `Error` state
          .toObservable()
          .startWith(Loading) // emit loading (same sealed class as above)
    }

但是,我从不想要重复(同时)的网络调用(内部平面图)。如果 tryAgainRelay 在网络调用已经在进行中时发射,我希望丢弃该发射。我可以使用以下代码避免它们:

var isLoading = false // track whether api call is in flight
Observable.just(Unit)
    .mergeWith(tryAgainRelay)
    .filter { isLoading.not() } // prevent emissions if api call is already in flight
    .flatMap {
      api.call()
          .toObservable()
          .startWith(Loading)
          // update state variable
          .doOnNext {
            isLoading = when (it) {
              Loading -> true
              is Error, is Success -> false
            }
          }
        }

它有点难看,涉及使用 .doOnNext() 步出流。有没有更好、更惯用的方法来做到这一点?

最佳答案

使用 concatMap 而不是 flatMap

重试将依次“排队”创建,因此您可以确保只有一个请求将并行执行(下一个请求将在前一个请求完成时开始 - 完成或错误 - 如果 tryAgainRelay 已发出)。

关于rx-java - 如何避免 Rx 流中的重复网络调用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46537839/

相关文章:

java - 迭代元素之间和订阅时的延迟

java - 重构谷歌的 NetworkBoundResource 类以使用 RxJava 而不是 LiveData

c# - 如何根据属性压缩 2 个序列(zip、join)

c# - 我如何断言 Observable 不会推送任何项目?

java - 带有空可观察值列表的 RxJava zip

android - 存储从 web 服务获取的数据后,在 RxJava 中返回订阅者

java - Couchbase 错误 : rx. exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value

java - 使用异步 for 循环调用 RxJava 依赖方法

java - RxJava 异步订阅

android - 在 RxJava(2) 中,有没有办法跟踪谁在主题上调用了 onNext() ?