java - 连接 "n"个观察者后 refCount : call underlying ConnectObservable. connect()

标签 java reactive-programming observable rx-java

我正在尝试从源可观察量将其拆分为两个可观察量。然后压缩这个新闻 observables 。

              |
         +--- A ---+
         |         |
         V         V
         B         C
         |         |
         +-> zip <-+
              |
              V

From a code point of view :

public Observable<Integer> doTheDev(Observable<Integer> A) {

    // share() == publish().refCount();
    Observable<Integer> bridge = A.share().subscribeOn(Schedulers.computation());

    Observable<Integer> B = bridge.count();
    Observable<Integer> C = bridge.sum();

    return Observable.zip(B, C, (b, c) -> b + c);
}

作为RxJava documentation show :一旦第一个 Observable 订阅,refCount() 就会在底层 ConnectableObservable 上调用 connect()

是否可以通过某种方式等待第 n 个可观察对象订阅调用 connect() ? (然后不要错过 Activity )

最佳答案

我尝试了一种方法来做到这一点。目前还不知道是否存在更好、更简单的解决方案。

我编写了一个新的运算符,其功能与 refCount 完全相同(事实上,我复制了 refCount 源代码...),但是,它只连接第 n 个订阅者,而不是第一个订阅者。

我只在新运算符的 call 方法中更改这部分代码:

if (id >= numberOfSub) {
    connect(t);
}

因此,在我的应用程序中,我可以按如下方式使用它:

private static Observable<Double> averageOf(Observable<Double> data, Scheduler scheduler) {

    ConnectableObservable<? extends Double> hot = data
            .publish();

    // call the underlying connect() after the 2nd subscription
    Observable<Double> bridge = Observable.create(new OnSubscribeRefCountN<>(hot, 2))
            .subscribeOn(scheduler);

    Observable<Double> count = bridge.count()
            .map(Integer::doubleValue);

    Observable<Double> sum = bridge.reduce(0.0, (seed, e) -> seed + e);

    return Observable.zip(sum, count, (s, c) -> s / c);
}

您可以在此处找到OnSubscribeRefCountN代码:https://github.com/bric3/demo-rxjava-humantalk/blob/d6227559b36fcafc08c0d8b894584400694d9a15/src/main/java/demo/humantalk/rxjava/operators/OnSubscribeRefCountN.java#L69

关于java - 连接 "n"个观察者后 refCount : call underlying ConnectObservable. connect(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26426269/

相关文章:

java - 如何在部署 OSGI 包时修复缺失的需求

java - 二叉树的层序遍历有哪些常见应用?

java - 多线程环境下的 mkdirs() 函数

java - 如何设置 RollingFileAppender 的最大总磁盘大小?

python - 为什么 RxPY Observable 充当无限可迭代对象?

java - 响应式(Reactive)编程是否受限于函数式编程?

javascript - RxJS Observable 在一些异步操作后触发 onCompleted

rx-java - 如何在响应式(Reactive)编程中抑制onComplete? (RxJava)

java - RxJava2 observable take 抛出 UndeliverableException

scala - Observable 需要无限量的内存?