angular - 一种将数据从一个可观察对象传递到另一个可观察对象的更好方法

标签 angular rxjs observable

我创建了两个用于测试 Observables 的函数,每个函数都返回一个 Observable:

foo() {
  return new Observable(observer => {
    let i = 0;
    setInterval( () => {
      if(i === 10) {
        observer.complete();
      } else {
        observer.next(i);
        i++;
      }
    }, 1000);
    // If I call observer.complete() here then it never completes
  });
}

bar(fooResult) {
  return new Observable(observer => {
    let j = 0;
    setInterval( () => {
      if(fooResult) {
        observer.next('j -> '+j+' plus fooResult '+JSON.stringify(fooResult));
        observer.complete();
      } else {
        observer.next(j);
        j++;
      }
    }, 2000);
  });
}

并像这样使用它们:

    let fooResult = [];

    // Testing observables...
    this.exampleProductService.foo().subscribe(
      (res) => { 
        console.log('foo next() -> '+res);
        fooResult.push(res);
      },
      (err) => {
        console.error('foo error: '+JSON.stringify(err));
      },
      () => { 
        console.log('foo finished');
        this.exampleProductService.bar(fooResult).subscribe(
          (res) => {
            console.log('bar next() -> '+res);
          },
           (err) => {
            console.error('bar error: '+JSON.stringify(err));
          },
          () => {
            console.log('bar finished');
          }
        );
      }
    );

提出的问题是:

  1. 是否有更好的方法将数据从一个 Observable 的完成传递到另一个也返回一个 Observable 的函数?建立一个数组似乎很麻烦,我无法执行以下操作,因为 Observablecomplete callback 部分没有传递像 progressUpdate 这样的参数> 和 onError:

    (complete) => { this.exampleProductService.bar(complete).// rest of code } 
    
  2. 我尝试将第一个函数的结果分配给一个变量,然后传递该变量,但正如预期的那样,我得到了一个 Observable 而不是我想要的结果。

  3. 我的上述做法有什么不正确的地方吗?

谢谢

附言这是一个 Angular 2 应用程序!

最佳答案

我认为您的功能有点过于复杂。一方面,当工厂函数已经可用时不要使用构造函数,在这种情况下是 intervaltimer 正如@Meir 指出的那样,尽管在这种情况下它会是更详细。

其次,bar 函数实际上并没有多大意义,因为您似乎在等待某些您已经知道已经完成的事情完成(因为您没有订阅直到 foo 生成的 Observable 完成 block 。

我根据您声明的目标进行了重构,即等待一个 Observable 在开始第二个之前完成,同时在第二个中使用第一个的结果。

// Factory function to emit 10 items, 1 every second
function foo() {
  return Observable.interval(1000)
    .take(10);
}

// Lifts the passed in value into an Observable and stringfys it
function bar(fooResult) {
  return Rx.Observable.of(fooResult)
    .map(res => JSON.stringify(fooResult))
}

现在当使用它们时你会这样做:

foo()
  // Log the side effects of foo
  .do(
    x => console.log(`foo next() -> ${x}`),
   err => console.error(`foo error: ${JSON.stringify(err)}`),
   () => console.log('foo finished')
  )
  // Collect the results from foo and only emit when foo completes
  .reduce((total, diff) => [...total, diff], [])

  // Pass the result from the reduce on to bar
  .concatMap(fooResult => bar(fooResult))

  //Subscribe to the results of bar
  .subscribe(
    res => console.log(`bar next() -> ${res}`),
    err => console.error(`bar error: ${JSON.stringify(err)}`),
    ()  => console.log('bar finished')
  );

注意上面我也摆脱了全局状态,这是函数式编程的诅咒。在可能的情况下,您的状态应该本地化到流中。

请参阅此处的工作示例:http://jsbin.com/pexayohoho/1/edit?js,console

关于angular - 一种将数据从一个可观察对象传递到另一个可观察对象的更好方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39851978/

相关文章:

angular - 如何在 mat-tab 中启用 routerLink

angular - 在 Typescript 中定义一个属性以防止 Angular 中的编译错误

angular - Angular 2 中的 routerLink 显示为纯文本

angular - 在带有异步管道的 *ngFor 中使用带有 http-call 的函数时无限循环

javascript - 获取所有鼠标点击的 Rx 流的可移植方法

angular - 给 BehaviorSubject 初始值 - 值是一个 Observable

带参数的 Angular 2 路由器标题

angular - 我应该取消订阅组件中定义的可观察值吗?

javascript - Redux 可观察到的 : dispatch action on multiple clicks (two or more)

Angular 6+ 类型错误 : Cannot read property 'firstName' of undefined