javascript - 如何在 rxjs 中为每个订阅在可观察管道上执行一次初始化逻辑

标签 javascript rxjs reactivex

所以我有这个可观察的管道,我需要在订阅开始时做一次操作,就像你可以使用 finalize()在订阅结束时执行一次操作
所以这就是我开始的方式,不幸的是它会每次启动一次 next()对主题发出的调用。

    const notificationSubject = new BehaviorSubject<Notification | undefined>(undefined);
    const notifications$ = this.notificationSubject.pipe(
      tap(() => startup()),
      filter(isValueDefined),
      finalize(() => shutdown())
    );

   notifications$.subscribe(noti => foo(noti));
   notifications$.subscribe(noti => bar(noti));
然后我们得到了这个变体:
    let isStartedUp = false;
    const internalStartup = () => {
      if(!isStartedUp){
        isStartedUp = true;
        startup();
      }
    }

    const notifications$ = notificationSubject.pipe(
      tap(() => internalStartup()),
      filter(isValueDefined),
      finalize(() => shutdown())
    );

   notifications$.subscribe(noti => foo(noti));
   notifications$.subscribe(noti => bar(noti));
...这是它的工作,但是它做得有点太好了,因为现在启动只进行一次(并且仅在第一次订阅时),而不是每个订阅创建一次。
我想有一些类似的东西,但我还没有找到。
const notifications$ = notificationSubject.pipe(
      initialize(() => startup()),
      finalize(() => shutdown())
    );

最佳答案

您可以使用 defer 在每次订阅时执行一些代码。

export function initialize<T>(initializer: () => void): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) => defer(() => {
    initializer();
    return source;
  });
}
const notifications$ = notificationSubject.pipe(
  initialize(() => startup()),
  finalize(() => shutdown())
);

关于javascript - 如何在 rxjs 中为每个订阅在可观察管道上执行一次初始化逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63899782/

相关文章:

javascript - 为什么这个对象没有被垃圾回收?

javascript - 在 Angular 2 中进行 http 调用后返回热/共享可观察值的最佳方法是什么

swift - 在 RxSwift 中合并两个通知观察者

android - 如何基于 Observable 返回值对 Presenter 方法进行单元测试?

angular - 使用 Observable<void> 或 Observable<any> 发出 `null` 值?

javascript - jquery onchange 事件不适用于在 php 中上传文件

javascript - 如何滑动div,勾选输入复选框

javascript - echarts中点击第一个图例时如何禁用其他图例

javascript - 更改超时时间

angular - 如何一个接一个地执行函数?