rxjs - 为什么共享运算符不阻止可观察的触发两次?

标签 rxjs

我有以下运算符:

  const prepare = (value$: Observable<string>) =>
    value$.pipe(tap((x) => console.log("remove: ", x)), share());

  const performTaskA = (removed$: Observable<string>) =>
    removed$.pipe(tap((x) => console.log("pathA: ", x)),);

  const performTaskB = (removed$: Observable<string>) =>
    removed$.pipe(tap((x) => console.log("pathB: ", x)));

我这样调用它们:

  const prepared$ = value$.pipe(prepare);
  const taskADone$ = prepared$.pipe(performTaskA);
  const taskBDone$ = prepared$.pipe(performTaskB);

  merge(taskADone$, taskBDone$).subscribe();

由于 prepare 中的 share,我希望“remove”仅记录一次,但它出现了两次。

为什么这不起作用?

代码沙箱:https://codesandbox.io/s/so-remove-fires-twice-iyk12?file=/src/index.ts

最佳答案

发生这种情况是因为您的源 Observable 是 of(),它只发出一个 next 通知,然后发出 complete。 RxJS 中的所有内容都是同步的,除非您与时间一起工作或故意使代码异步(例如,使用 Promise.resolve 或使用 asyncScheduler)。

在您的演示中,share() 立即收到一个 next 和一个 complete 通知,这会使其内部状态重置。它还会取消订阅其源 Obserable,因为不再有观察者(您要合并的第二个源 taskBDone$ 尚未订阅)。然后 taskBDone$ 被合并到链中,并且 share() 在内部创建 Subject 的新实例,并且重复整个过程。

这些是share()中的相关部分:

因此,如果您的源要同步,您应该更好地使用 shareReplay() (而不是 share()),它只会重播整个事件序列每个新观察者。

您更新的演示:https://stackblitz.com/edit/rxjs-jawajw?devtoolsheight=60

请注意,在您的演示中,如果您使用 of("TEST").pipe(delay(0)) 作为源 Observable,它将按您的预期工作,因为 delay(0 ) 将强制异步行为,两个源 Observables 将首先订阅,然后在另一个 JavaScript 框架中发出它们的 nextcomplete

关于rxjs - 为什么共享运算符不阻止可观察的触发两次?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67565774/

相关文章:

reactjs - rxjs 中的嵌套和扁平化有什么区别?

angular - 在ngrx效果中实现catchError后全局ErrorHandler不起作用

redux - 使用惯用的 redux observable 排队和取消事件

typescript - 使用闭包而不是 `thisArg` ? .pipe( map (

javascript - 如何使用 Rxjs switchMap 抛出错误

angular - 为什么我会收到 "Subscribe is not a function"错误?

javascript - 在 Angular2 NGmodule 中。我想根据条件加载 2 组不同的路由模块

angular - 在 Angular 中导入 RxJS 运算符时出错

javascript - RxJs:在 flatMapLatest 完成后访问 flatMapLatest 之前的数据

angular - RXJS - Observable.do 不工作