我有以下运算符:
const prepare = (value$: Observable<string>) =>
value$.pipe(tap((x) => console.log("remove: ", x)), share());
const performTaskA = (removed$: Observable<string>) =>
removed$.pipe(tap((x) => console.log("pathA: ", x)),);
const performTaskB = (removed$: Observable<string>) =>
removed$.pipe(tap((x) => console.log("pathB: ", x)));
我这样调用它们:
const prepared$ = value$.pipe(prepare);
const taskADone$ = prepared$.pipe(performTaskA);
const taskBDone$ = prepared$.pipe(performTaskB);
merge(taskADone$, taskBDone$).subscribe();
由于 prepare
中的 share
,我希望“remove”仅记录一次,但它出现了两次。
为什么这不起作用?
代码沙箱:https://codesandbox.io/s/so-remove-fires-twice-iyk12?file=/src/index.ts
最佳答案
发生这种情况是因为您的源 Observable 是 of()
,它只发出一个 next
通知,然后发出 complete
。 RxJS 中的所有内容都是同步的,除非您与时间一起工作或故意使代码异步(例如,使用 Promise.resolve 或使用 asyncScheduler)。
在您的演示中,share()
立即收到一个 next
和一个 complete
通知,这会使其内部状态重置。它还会取消订阅其源 Obserable,因为不再有观察者(您要合并的第二个源 taskBDone$
尚未订阅)。然后 taskBDone$
被合并到链中,并且 share()
在内部创建 Subject
的新实例,并且重复整个过程。
这些是share()
中的相关部分:
- 从源 https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/share.ts#L120 接收到
完整
后触发处置处理程序 - 已创建新主题:https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/share.ts#L113
share()
重置其状态 https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/share.ts#L163
因此,如果您的源要同步,您应该更好地使用 shareReplay()
(而不是 share()
),它只会重播整个事件序列每个新观察者。
您更新的演示:https://stackblitz.com/edit/rxjs-jawajw?devtoolsheight=60
请注意,在您的演示中,如果您使用 of("TEST").pipe(delay(0))
作为源 Observable,它将按您的预期工作,因为 delay(0 )
将强制异步行为,两个源 Observables 将首先订阅,然后在另一个 JavaScript 框架中发出它们的 next
和 complete
。
关于rxjs - 为什么共享运算符不阻止可观察的触发两次?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67565774/