我这样定义一个 Observable:
const obs$ = Observable.create(...)
.publishReplay(1)
.refCount();
这样它就会在我的源 Observable 和所有观察者之间放置一个 ReplaySubject(1)。
由于 ReplaySubject 的状态具有观察者数量(通过其 observers
数组属性),因此如何从 obs$
访问 ReplaySubject?
我实际上只需要知道 obs$
是否有观察者。 RxJS4 有 a hasObservers() method在主题上,但它在 RxJS5 中被删除。如何使用 RxJS5 实现此目的?
最佳答案
不确定您的用法,但根据我的需要,我创建了一个自定义运算符,它允许我根据 refCount 的状态透明地执行副作用(类似于点击)。它只是进行传递订阅并鸭子打洞 sub/unsub。回调获取当前 refCount 和前一个,以便您可以了解状态和方向。我喜欢为此使用运算符,因为我可以将其插入流中的任何位置。如果您只是想要一个二进制输出来确定是否有任何订阅,则可以轻松对其进行修改。
const { Observable, Observer, interval } = rxjs;
const { publishReplay, refCount } = rxjs.operators;
const tapRefCount = (onChange) => (source) => {
let refCount = 0;
// mute the operator if it has nothing to do
if (typeof onChange !== 'function') {
return source;
}
// mute errors from side-effects
const safeOnChange = (refCount, prevRefCount) => {
try {
onChange(refCount, prevRefCount);
} catch (e) {
}
};
// spy on subscribe
return Observable.create((observer) => {
const subscription = source.subscribe(observer);
const prevRefCount = refCount;
refCount++;
safeOnChange(refCount, prevRefCount);
// spy on unsubscribe
return () => {
subscription.unsubscribe();
const prevRefCount = refCount;
refCount--;
safeOnChange(refCount, prevRefCount);
};
});
};
const source = interval(1000).pipe(
publishReplay(1),
refCount(),
tapRefCount((refCount, prevRefCount) => { console.log('refCount', refCount, prevRefCount > refCount ? 'down': 'up'); })
);
const firstSub = source.subscribe((x) => { console.log('first', x); });
let secondSub;
setTimeout(() => {
secondSub = source.subscribe((x) => { console.log('second', x); });
}, 1500);
setTimeout(() => {
firstSub.unsubscribe();
}, 4500);
setTimeout(() => {
secondSub.unsubscribe();
}, 5500);
<script src="https://unpkg.com/rxjs@rc/bundles/rxjs.umd.min.js"></script>
typescript 版本:
import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';
export const tapRefCount = (
onChange: (refCount: number, prevRefCount: number) => void
) => <T>(source: Observable<T>): Observable<T> => {
let refCount = 0;
// mute the operator if it has nothing to do
if (typeof onChange !== 'function') {
return source;
}
// mute errors from side-effects
const safeOnChange = (refCount, prevRefCount) => {
try {
onChange(refCount, prevRefCount);
} catch (e) {
}
};
// spy on subscribe
return Observable.create((observer: Observer<T>) => {
const subscription = source.subscribe(observer);
const prevRefCount = refCount;
refCount++;
safeOnChange(refCount, prevRefCount);
// spy on unsubscribe
return () => {
subscription.unsubscribe();
const prevRefCount = refCount;
refCount--;
safeOnChange(refCount, prevRefCount);
};
}) as Observable<T>;
};
关于rxjs - 检查publishReplay().refCount()是否有观察者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49976825/