在 Rxjs 中,有管道 takeUntil 但没有管道 wait Until,这使得当前可观察对象等待其次可观察到发射。
我的最终目标是让许多 Observable 仍在等待,直到我的 Observable init$ 仅发出一个值,以继续执行。因此,我的 Observable init$ 必须执行一次,并且在此之前我的其他 observable 必须等待,直到 inits 发出任何不同于 null 的值。
在这个简单的示例中,我想向 pipedSource 添加一个管道,执行 wait Until init$ ,因此源必须等到init$ 发出它的值。
import { interval, timer, Subject, combineLatest } from 'rxjs';
import { takeUntil, skipWhile, skipUntil, concatMap, map, take } from 'rxjs/operators';
import { BehaviorSubject } from 'rxjs';
const init$ = new BehaviorSubject(null);
const source = new BehaviorSubject(null);
const pipedSource = source
.pipe(
skipWhile((res)=> res === null)
//wait until init$ emits a non null value
)
//first subscription to source
pipedSource.subscribe(val => console.log(val));
source.next({profile:"me"});
//init emits once
setTimeout(()=>{
init$.next(1);
},2000);
// a second subscription to source
setTimeout(()=>{
pipedSource.subscribe(val => console.log(val));
},3000);
想要的结果:
//after 2s of waiting
//first subscription returns "profile"
//after 3s
//second subscription returns "profile"
最佳答案
当第一个可观察值发出非空值时,您希望运行第二个可观察值。为此,请在 skipWhile
之后使用 concatMap
或 switchMap
。
ngOnInit() {
const init$ = new BehaviorSubject(null);
const source = new BehaviorSubject({profile:"me"});
const pipedSource = init$
.pipe(
skipWhile((res)=> res === null),
concatMap(() => source)
);
pipedSource.subscribe(val => console.log('first', val));
//init emits once
setTimeout(()=>{
init$.next(1);
},2000);
// a second subscription to source
setTimeout(()=>{
pipedSource.subscribe(val => console.log('second', val));
}, 3000);
}
在这里,我首先订阅 init$
observable,等待它发出非空值,然后切换到 source
observable。
关于angular - 等待直到第二个 Observable 发出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60783410/