我正在尝试从源可观察量将其拆分为两个可观察量。然后压缩这个新闻 observables 。
| +--- A ---+ | | V V B C | | +-> zip <-+ | V
From a code point of view :
public Observable<Integer> doTheDev(Observable<Integer> A) {
// share() == publish().refCount();
Observable<Integer> bridge = A.share().subscribeOn(Schedulers.computation());
Observable<Integer> B = bridge.count();
Observable<Integer> C = bridge.sum();
return Observable.zip(B, C, (b, c) -> b + c);
}
作为RxJava documentation show :一旦第一个 Observable 订阅,refCount()
就会在底层 ConnectableObservable
上调用 connect()
。
是否可以通过某种方式等待第 n 个可观察对象订阅调用 connect()
? (然后不要错过 Activity )
最佳答案
我尝试了一种方法来做到这一点。目前还不知道是否存在更好、更简单的解决方案。
我编写了一个新的运算符,其功能与 refCount 完全相同(事实上,我复制了 refCount 源代码...),但是,它只连接第 n 个订阅者,而不是第一个订阅者。
我只在新运算符的 call
方法中更改这部分代码:
if (id >= numberOfSub) {
connect(t);
}
因此,在我的应用程序中,我可以按如下方式使用它:
private static Observable<Double> averageOf(Observable<Double> data, Scheduler scheduler) {
ConnectableObservable<? extends Double> hot = data
.publish();
// call the underlying connect() after the 2nd subscription
Observable<Double> bridge = Observable.create(new OnSubscribeRefCountN<>(hot, 2))
.subscribeOn(scheduler);
Observable<Double> count = bridge.count()
.map(Integer::doubleValue);
Observable<Double> sum = bridge.reduce(0.0, (seed, e) -> seed + e);
return Observable.zip(sum, count, (s, c) -> s / c);
}
您可以在此处找到OnSubscribeRefCountN
代码:https://github.com/bric3/demo-rxjava-humantalk/blob/d6227559b36fcafc08c0d8b894584400694d9a15/src/main/java/demo/humantalk/rxjava/operators/OnSubscribeRefCountN.java#L69
关于java - 连接 "n"个观察者后 refCount : call underlying ConnectObservable. connect(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26426269/