reactive-programming - 如何使用/控制 RxJava Observable.cache

标签 reactive-programming rx-java2 rx-android

我正在尝试使用 RxJava 缓存机制( RxJava2 ),但我似乎无法理解它是如何工作的,或者我如何控制缓存的内容,因为有 cache运算符(operator)。
我想在发出新数据之前用一些条件验证缓存的数据。
例如

someObservable.
repeat().
filter { it.age < maxAge }.
map(it.name).
cache() 

我如何检查和过滤缓存值并在它成功时发出它,否则我将请求一个新值。
由于该值会定期更改,因此我需要先验证缓存是否仍然有效,然后才能请求新缓存。
还有ObservableCache<T>类,但我找不到任何使用它的资源。
任何帮助将非常感激。谢谢。

最佳答案

这不是重放/缓存的工作方式。请先阅读 #replay/#cache 文档。
重播
这个操作符返回一个 ConnectableObservable,它有一些方法 (#refCount/#connect/#autoConnect) 用于连接到源。
当#replay 在没有过载的情况下应用时,源订阅被多播,并且所有发出的值都将被重播。源订阅是惰性的,可以通过#refCount/#connect/#autoConnect 连接到源。

Returns a ConnectableObservable that shares a single subscription to the underlying ObservableSource that will replay all of its items and notifications to any future Observer.


在没有任何连接方法的情况下应用 #relay (#refCount/#connect/#autoConnect) 不会在订阅时发出任何值

A Connectable ObservableSource resembles an ordinary ObservableSource, except that it does not begin emitting items when it is subscribed to, but only when its connect method is called.


重播(1)#autoConnect(-1)/#refCount(1)/#connect
应用 replay(1) 将缓存最后一个值,并将在每个订阅上发出缓存的值。 #autoConnect 将立即连接打开连接并保持打开状态,直到发生终端事件(onComplete、onError)。 #refCount 是模拟的,但是当所有订阅者都消失时,它会与源断开连接。 #connect 操作符可以用于,当你需要等待时,当对可观察对象完成所有订阅时,为了不丢失值。
用法
#replay(1)——大部分应该在可观察的末尾使用。
sourcObs.
  .filter()
  .map()
  .replay(bufferSize)
  .refCount(connectWhenXSubsciberSubscribed) 
警告
应用没有缓冲区限制或到期日期的#replay 会导致内存泄漏,当您观察到无限时
缓存/cacheWithInitialCapacity
运算符类似于带有 autoConnect(1) 的 #replay。运算符(operator)将缓存每个值并在每个订阅上重放。

The operator subscribes only when the first downstream subscriber subscribes and maintains a single subscription towards this ObservableSource. In contrast, the operator family of replay() that return a ConnectableObservable require an explicit call to ConnectableObservable.connect(). Note: You sacrifice the ability to dispose the origin when you use the cache Observer so be careful not to use this Observer on ObservableSources that emit an infinite or very large number of items that will use up memory. A possible workaround is to apply takeUntil with a predicate or another source before (and perhaps after) the application of cache().


例子
    @Test
    fun skfdsfkds() {
        val create = PublishSubject.create<Int>()

        val cacheWithInitialCapacity = create
            .cacheWithInitialCapacity(1)

        cacheWithInitialCapacity.subscribe()

        create.onNext(1)
        create.onNext(2)
        create.onNext(3)

        cacheWithInitialCapacity.test().assertValues(1, 2, 3)
        cacheWithInitialCapacity.test().assertValues(1, 2, 3)
    }
用法
使用缓存操作符,当你无法控制连接阶段时

This is useful when you want an ObservableSource to cache responses and you can't control the subscribe/dispose behavior of all the Observers.


警告
与 replay() 一样,缓存是无界的,可能会导致内存泄漏。

Note: The capacity hint is not an upper bound on cache size. For that, consider replay(int) in combination with ConnectableObservable.autoConnect() or similar.


进一步阅读
https://blog.danlew.net/2018/09/25/connectable-observables-so-hot-right-now/
https://blog.danlew.net/2016/06/13/multicasting-in-rxjava/

关于reactive-programming - 如何使用/控制 RxJava Observable.cache,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62607426/

相关文章:

haskell - 推荐阅读/教程来了解reactive-banana FRP库

android - RxJava 在单个订阅者中观察多个观察者

rx-java2 - 使用 RxJava2 连接字符串

android - 如何同时有效地读取两个 BLE 设备的温度?

android - 如何忽略错误并继续无限流?

javascript - 如何在 RxJS 中通过 ID 去抖

c# - Observable.Range 是否违反了 Observable 契约?

java - 使用 slf4j 的 Project Reactor 非阻塞日志记录

java - 没有参数的 Lambda 函数 - 无法推断功能接口(interface)类型

android - 如何在 RxAndroid 2.0 中正确过滤/计数?