javascript - rxjs: observable.complete 不是函数

标签 javascript rxjs reactive-programming

在下面的代码中,我尝试演示 concatMap 如何保留事件的顺序,即使对事件执行的操作未按顺序完成也是如此。

现在我得到的错误是

delayerObservable.complete() is not a function

这基本上只是从教程中摘录的。我调用 next(),然后调用 complete()。它应该有效,至少我是这么认为的。

我可以通过返回 randomDelayer.first() 实现所需的功能

return randomDelayer.first()

但我想从内部完成 observable,因为我可能想发送更多的事件,而不仅仅是一个。

const myTimer = Rx.Observable.create((observer) => {
  let counter = 0;
  setInterval( () => {
    observer.next(counter++);    
    console.log('called next with counter: '+ counter);
  },2000);
});

const myRandomDelayer = myTimer.concatMap( (value) => {
    const randomDelayer = Rx.Observable.create( (delayerObservable) => {
        const delay = Math.floor(Math.random()*1000);
        setTimeout(() => {
            console.log(delayerObservable);
            delayerObservable.next('Hello, I am Number ' + value + ' and this was my delay: ' + delay);
            delayerObservable.complete(); // <<-- this does not work (not a function)
        }, delay);
    });
    return randomDelayer;
});

myRandomDelayer.subscribe( (message) => {
    console.log(message);
});

最佳答案

rxjs-framework 版本 4 和 6 之间似乎有不少变化。有缺陷源的工作版本是这样的:

const { Observable } = rxjs;
const { map, filter, concatMap, pipe } = rxjs.operators;

console.log('Starting....');

const myTimer = Observable.create((observer) => {
  let counter = 0;
  setInterval( () => {
      counter++;
      if (counter < 10){
          console.log('nexting now with counter ' + counter);
          observer.next(counter);    
      } else {
          observer.complete();
      }
  },1000);
});

const myRandomDelayer = myTimer.pipe(
    concatMap( (value) => {
        const randomDelayer = Observable.create( (delayerObservable) => {
            const delay = Math.floor(Math.random()*5000);
            setTimeout(() => {
                delayerObservable.next('Hello, I am Number ' + value + ' and this was my delay: ' + delay);
                delayerObservable.complete();
            }, delay);
        });
        return randomDelayer;
    })
);

myRandomDelayer.subscribe( (message) => {
    console.log(message);
});

关于javascript - rxjs: observable.complete 不是函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53538516/

相关文章:

javascript - 在 JavaScript 中检查字母数字的最佳方式

javascript - 如何使用附加属性扩展映射的 knockout View 模型 "on the fly"?

java - 如何将 Observable<Observable<List<T>>> 转换为 Observable<List<T>>

ios - RxSwift 如何有条件地控制按钮何时发出水龙头?

javascript - 合并两个 RxJS 流(基于同步)

javascript - JS - 这里有什么区别?

javascript - JQuery - mousedown、mouseup 并单击同一元素

javascript - 如何解决 RxJS 范围重叠问题?

typescript - RXJS Angular - 使用 .subscribe 和 Observable 进行错误处理

rxjs - 'of' 与 'from' 运算符