javascript - RxJS zip 在 forkJoin 时不起作用

标签 javascript angular rxjs reactive-programming

我想要实现的是这个(使用 Angular 2/Typescript):

  • Observable A 产生事件流。

  • 对于 Observable A 的每个事件,进行 8 次不同的 http 调用。 (8 个开关图)

  • 8个请求全部返回后,做点什么(订阅8个switchmaps的zip)。

  • 对 Observable A 的每个事件重复 8 次请求(由 switchmap 和 zip 处理)

代码:(完整代码位于 https://plnkr.co/edit/44yqw0RYzC7v1TFACMx1 )

let source = Observable
.interval(5000)
.take(100);

let requests = [];

for(let i=0; i<8;i++) {
  let request = source.switchMap(x=> http.get('https://jsonplaceholder.typicode.com/users/'+(i+1))).publish();
  request.subscribe(res => console.log(res.json()));
  requests.push(request);
}

Observable.zip(requests)
.subscribe(console.log("All requests completed"));

requests.forEach(r => r.connect());

问题是我的 zip 从未被调用过。我控制台记录了对 8 个开关映射中每一个的订阅,并且我得到的日志显示每次 Observable/stream A 中有事件时,8 个 http 调用成功返回。(也可以在网络选项卡中看到 8 个调用返回调试工具)

但是 zip 从不发出任何东西。


如果我尝试不同的(不太理想的)方法:

  • 订阅 Observable A 一次(不是 switchmap)
  • 在订阅中为每个 http 调用创建 8 个 Observable,并订阅 8 个 Observable 的 ForkJoin

代码:(完整代码位于 https://plnkr.co/edit/GqQde1Ae2licBjtL0jcj )

let source = Observable
.interval(5000)
.take(100);

 source.subscribe(x=> {
   console.log(x);
   let requests = [];

   for(let i=0; i<8;i++) {
     let request = http.get('https://jsonplaceholder.typicode.com/users/'+(i+1)).publish();
     request.subscribe(res => console.log(res.json()));
     requests.push(request);
   }

   Observable.forkJoin(requests)
   .subscribe(console.log("All requests completed"));

   requests.forEach(r => r.connect());

 });

这行得通。但是有一个明显的缺陷,即每次 Observable A 发出时我都会创建 8+1 个嵌套的 observables/订阅。

(在这两种情况下,我都使用发布/连接来共享/重用订阅,但即使没有它也存在问题)

最佳答案

如果您使用多个参数正确调用 zip 并传递一个函数来订阅(不是未定义的 console.log 的结果),那么您的第一个示例将起作用。 Demo .

Observable.zip(...requests) // <-- spread this 
    .subscribe(() => console.log("All requests completed")); // <-- pass a function

requests.forEach(r => r.connect());

关于javascript - RxJS zip 在 forkJoin 时不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44901154/

相关文章:

angular - 如何从angular2中的函数调用视频暂停事件

javascript - 如何计算按钮点击次数

javascript - 如何将 JSON 对象数组传递给 MVC Controller ?

javascript - 从函数内部所需的数据填充空对象?

javascript - Angular CDK - 在嵌套的可滚​​动 div 内滚动和拖动元素的问题

Angular2 - Validators.compose

angular - 如何检查多个订阅是否达到onComplete()?

javascript - rxjs switchmap observable,为最终保存变量

javascript - 如何用async/await获取第二个 `then`回调参数?

javascript - svg.js:水平偏移换行符 tspans 以创建交错外观