所以我有这个可观察的管道,我需要在订阅开始时做一次操作,就像你可以使用 finalize()
在订阅结束时执行一次操作
所以这就是我开始的方式,不幸的是它会每次启动一次 next()
对主题发出的调用。
const notificationSubject = new BehaviorSubject<Notification | undefined>(undefined);
const notifications$ = this.notificationSubject.pipe(
tap(() => startup()),
filter(isValueDefined),
finalize(() => shutdown())
);
notifications$.subscribe(noti => foo(noti));
notifications$.subscribe(noti => bar(noti));
然后我们得到了这个变体: let isStartedUp = false;
const internalStartup = () => {
if(!isStartedUp){
isStartedUp = true;
startup();
}
}
const notifications$ = notificationSubject.pipe(
tap(() => internalStartup()),
filter(isValueDefined),
finalize(() => shutdown())
);
notifications$.subscribe(noti => foo(noti));
notifications$.subscribe(noti => bar(noti));
...这是它的工作,但是它做得有点太好了,因为现在启动只进行一次(并且仅在第一次订阅时),而不是每个订阅创建一次。我想有一些类似的东西,但我还没有找到。
const notifications$ = notificationSubject.pipe(
initialize(() => startup()),
finalize(() => shutdown())
);
最佳答案
您可以使用 defer
在每次订阅时执行一些代码。
export function initialize<T>(initializer: () => void): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => defer(() => {
initializer();
return source;
});
}
const notifications$ = notificationSubject.pipe(
initialize(() => startup()),
finalize(() => shutdown())
);
关于javascript - 如何在 rxjs 中为每个订阅在可观察管道上执行一次初始化逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63899782/