java - 按需执行热 Observable

标签 java reactive-programming rx-java reactive-streams

举个冷门例子:

Observable<Integer> cold = Observable.create(subscriber -> {
  try {
    for (int i = 0; i <= 42; i++) {

      // avoid doing unnecessary work
      if (!subscriber.isUnsubscribed()) {
        break;
      }

      subscriber.onNext(i);
    }
    subscriber.onCompleted();
  } catch (Throwable cause) {
    subscriber.onError(cause);
  }
});

它开始为每个新订阅者从头开始执行:

// starts execution
cold.subscribe(...)

如果订阅者提前取消订阅,可以停止执行:

// stops execution
subscription.unsubscribe();

现在,如果我们有一些实际的业务逻辑在进行(不需要为每个订阅者重放而是实时的)而不是循环示例,那么我们正在处理热可观察...

PublishSubject<Integer> hot = PublishSubject.create();

Thread thread = new Thread(() -> {
  try {
    for (int i = 0; i < 42; i++) {
      // how to avoid unnecessary work when no one is subscribed?
      hot.onNext(i);
    }
    hot.onCompleted();
  } catch (Throwable cause) {
    hot.onError(cause);
  }
});

当我们希望它开始时,我们可能会这样做

// stats work (although no one is subscribed) 
thread.start();

因此第一个问题:如何仅在第一个观察者订阅时才开始工作?(也许是可连接的可观察者?)

和重要的问题:当最后一个订阅者取消订阅时如何停止工作?(我不知道如何访问该主题的当前订阅,并且想找到没有共享全局状态的干净解决方案,如果存在这样的解决方案)

我能想到的一个解决方案是使用将管理订阅者的自定义操作符提升主题...

最佳答案

参见运算符 refCount - http://reactivex.io/documentation/operators/refcount.html .此 Operator 将您的 Observable 转换为 ConnectableObservable,并在第一个订阅者订阅时连接它,并在没有更多订阅时断开连接

关于java - 按需执行热 Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34462750/

相关文章:

java - Android RxJava Observable.interval() 不会停止发射项目

java - 我怎样才能只订阅最后的数据?

java - 如何从接口(interface)获取枚举到实现该接口(interface)的类?

java - 更改应用程序语言

java - 将 ArrayList<String> 添加到 ArrayList<ArrayList<String>>

java - 如何根据发出的事件有条件地缓冲分组的 Observable/Flux?

java - Rxjava重试时立即调用

java - $_POST 未从 Java 代码接收数据

objective-c - ReactiveCocoa 0.10.0中的selectMany在哪里

android - RxJava 运算符类似于 replay(1, 1, MINUTE),但在 1 分钟后重新订阅