在下面的代码中,我尝试演示 concatMap 如何保留事件的顺序,即使对事件执行的操作未按顺序完成也是如此。
现在我得到的错误是
delayerObservable.complete() is not a function
这基本上只是从教程中摘录的。我调用 next(),然后调用 complete()。它应该有效,至少我是这么认为的。
我可以通过返回 randomDelayer.first() 实现所需的功能
return randomDelayer.first()
但我想从内部完成 observable,因为我可能想发送更多的事件,而不仅仅是一个。
const myTimer = Rx.Observable.create((observer) => {
let counter = 0;
setInterval( () => {
observer.next(counter++);
console.log('called next with counter: '+ counter);
},2000);
});
const myRandomDelayer = myTimer.concatMap( (value) => {
const randomDelayer = Rx.Observable.create( (delayerObservable) => {
const delay = Math.floor(Math.random()*1000);
setTimeout(() => {
console.log(delayerObservable);
delayerObservable.next('Hello, I am Number ' + value + ' and this was my delay: ' + delay);
delayerObservable.complete(); // <<-- this does not work (not a function)
}, delay);
});
return randomDelayer;
});
myRandomDelayer.subscribe( (message) => {
console.log(message);
});
最佳答案
rxjs-framework 版本 4 和 6 之间似乎有不少变化。有缺陷源的工作版本是这样的:
const { Observable } = rxjs;
const { map, filter, concatMap, pipe } = rxjs.operators;
console.log('Starting....');
const myTimer = Observable.create((observer) => {
let counter = 0;
setInterval( () => {
counter++;
if (counter < 10){
console.log('nexting now with counter ' + counter);
observer.next(counter);
} else {
observer.complete();
}
},1000);
});
const myRandomDelayer = myTimer.pipe(
concatMap( (value) => {
const randomDelayer = Observable.create( (delayerObservable) => {
const delay = Math.floor(Math.random()*5000);
setTimeout(() => {
delayerObservable.next('Hello, I am Number ' + value + ' and this was my delay: ' + delay);
delayerObservable.complete();
}, delay);
});
return randomDelayer;
})
);
myRandomDelayer.subscribe( (message) => {
console.log(message);
});
关于javascript - rxjs: observable.complete 不是函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53538516/