rxjs - 检查publishReplay().refCount()是否有观察者

标签 rxjs

我这样定义一个 Observable:

const obs$ = Observable.create(...)
  .publishReplay(1)
  .refCount();

这样它就会在我的源 Observable 和所有观察者之间放置一个 ReplaySubject(1)。

由于 ReplaySubject 的状态具有观察者数量(通过其 observers 数组属性),因此如何从 obs$ 访问 ReplaySubject?

我实际上只需要知道 obs$ 是否有观察者。 RxJS4 有 a hasObservers() method在主题上,但它在 RxJS5 中被删除。如何使用 RxJS5 实现此目的?

最佳答案

不确定您的用法,但根据我的需要,我创建了一个自定义运算符,它允许我根据 refCount 的状态透明地执行副作用(类似于点击)。它只是进行传递订阅并鸭子打洞 sub/unsub。回调获取当前 refCount 和前一个,以便您可以了解状态和方向。我喜欢为此使用运算符,因为我可以将其插入流中的任何位置。如果您只是想要一个二进制输出来确定是否有任何订阅,则可以轻松对其进行修改。

const { Observable, Observer, interval } = rxjs;
const { publishReplay, refCount } = rxjs.operators;

const tapRefCount = (onChange) => (source) => {
  let refCount = 0;

  // mute the operator if it has nothing to do
  if (typeof onChange !== 'function') {
    return source;
  }
  // mute errors from side-effects
  const safeOnChange = (refCount, prevRefCount) => {
    try {
      onChange(refCount, prevRefCount);
    } catch (e) {
    }
  };
  
  // spy on subscribe
  return Observable.create((observer) => {
    const subscription = source.subscribe(observer);
    const prevRefCount = refCount;
    refCount++;
    safeOnChange(refCount, prevRefCount);
    
    // spy on unsubscribe
    return () => {
      subscription.unsubscribe();
      const prevRefCount = refCount;
      refCount--;
      safeOnChange(refCount, prevRefCount);
    };
  });
};

const source = interval(1000).pipe(
  publishReplay(1),
  refCount(),
  tapRefCount((refCount, prevRefCount) => { console.log('refCount', refCount, prevRefCount > refCount ? 'down': 'up'); })
);

const firstSub = source.subscribe((x) => { console.log('first', x); });
let secondSub;
setTimeout(() => {
  secondSub = source.subscribe((x) => { console.log('second', x); });
}, 1500);
setTimeout(() => {
  firstSub.unsubscribe();
}, 4500);
setTimeout(() => {
  secondSub.unsubscribe();
}, 5500);
<script src="https://unpkg.com/rxjs@rc/bundles/rxjs.umd.min.js"></script>

typescript 版本:

import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';

export const tapRefCount = (
  onChange: (refCount: number, prevRefCount: number) => void
) => <T>(source: Observable<T>): Observable<T> => {
  let refCount = 0;

  // mute the operator if it has nothing to do
  if (typeof onChange !== 'function') {
    return source;
  }

  // mute errors from side-effects
  const safeOnChange = (refCount, prevRefCount) => {
      try {
        onChange(refCount, prevRefCount);
      } catch (e) {
    }
  };

  // spy on subscribe
  return Observable.create((observer: Observer<T>) => {
    const subscription = source.subscribe(observer);
    const prevRefCount = refCount;
    refCount++;
    safeOnChange(refCount, prevRefCount);

    // spy on unsubscribe
    return () => {
      subscription.unsubscribe();
      const prevRefCount = refCount;
      refCount--;
      safeOnChange(refCount, prevRefCount);
    };
  }) as Observable<T>;
};

关于rxjs - 检查publishReplay().refCount()是否有观察者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49976825/

相关文章:

Rxjs Observable.take(1) 与 Subscription.unsubscribe()

javascript - 使用 rxjs 创建一个将在稍后连接到 Web 套接字的可观察对象

angular - 来自 Angular Route Guard 的 Web Api 调用

javascript - 用响应式编程重写事件发射器(信号量示例)

Angular 2 试图在服务中实现 Observables

javascript - Angular subject() 可观察,在不同的组件中不起作用

javascript - 订阅/取消订阅 Rxjs Subject 时如何接收通知

javascript - 如何检查数组中的id是否出现在另一个数组中,然后将其推送到新数组

rxjs - RxJS 中通知的用例是什么?

angular - 如何使用 Angular 错误处理获取出现在控制台上的 CORS 错误?