java - RxJava 异步缓存 : proper way to dispose replay(). autoConnect() Observable

标签 java rx-java observable

我必须为可观察结果提供一个短期缓存。

查看选项,我看到以下内容:

  1. 缓存 replay(1).refCount()当数据准备好时,缓存实际值。 缓存检索将检查实际数据并执行 Observable.just或返回 挂起的 Observable 或发起新的请求。

  2. 缓存 replay(1).autoConnect(1)并始终返回那个

后者看起来更直接,但它有一个警告,即当缓存必须失效时如何正确处理 observable。

有一个签名:

public Observable<T> autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection)

但很难说我如何跟踪未完成的订阅,以及处理是否妥当。

前者将负责资源释放,但您必须产生更复杂的逻辑。

最佳答案

为什么不是 .cache()

public class CachedObservable<K,V> {
  private Function<K, Observable<V>> actual;
  private CachedObservable(Function<K, Observable<V>> actual){this.actual=actual;}
  private final Map<K, Observable<V>> cacheMap = new ConcurrentHashMap<>();

  public Observable<V> get(K key) {
    return cacheMap.computeIfAbsent(key, k -> this.actual.call(k).cache());
  }
  public void invalidate(K key){cacheMap.remove(key);}
}

关于java - RxJava 异步缓存 : proper way to dispose replay(). autoConnect() Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42658732/

相关文章:

Java Scanner - 下一个 String 直到 |被发现

java - 在 hibernate 中使用 jpa 时获取 hibernate.properties not found 和 java.lang.NoClassDefFoundError

android - 使用改造和 Rx android 进行单元测试

node.js - 检测可观察量何时连续两次发出相同的值

java - 使用 Cucumber、Jmeter 和 FailSafe 的自动化框架是否需要 ThreadLocal?

java - 将具有指定泛型列表类型的 Observable 作为方法参数传递

android - 覆盖 RxAndroid 调度程序以进行测试

scala - 创建 rx Observable 后添加元素

promise - 为什么 switchMap 运算符在与 Promise 一起使用时仅发出最后一个值?

java - 设计中间数据文件的格式?