node.js - RxJS - 使用 zip 并行化导致 HTTP 请求无法正常工作的延迟 Observables - 您在需要流的地方提供了无效的对象

标签 node.js rxjs

我正在运行一个 Node.js API 端点,该端点必须在其端点之一并行化多个 HTTP 调用。

以星球大战 API 为例。我通过id获取用户信息。它包含一系列电影,我想并行检索所有这些电影信息。

获取人员信息后,我创建了一系列可观察对象,每个观察对象都获取其中一部电影。我设置了一个 zip 运算符来组合所有获取的电影的结果。

重点是,订阅后我看了 n 次,每部电影一次,console.log(filmData)信息,这是正确的。但似乎 zip 回调带有 console.log("*********************************************************************");不被调用。接下来的、错误的、完整的回调也都不是。

为什么会这样?

client.get("https://swapi.co/api/people/"+uid, args, (data, response) => {
    var filmsIt = data.films;
    for(var i in filmsIt){
        var observable = Rx.Observable.defer(function () {
            client.get(filmsIt[i], args, (filmData, filmResponse) => {
                console.log(filmData);
                return filmData;
            });
        });
        observables.push(observable);
    }
    var observableFinal = Rx.Observable.zip(...observables, function() {
        console.log("*********************************************************************");
    }).subscribe(
        function (x) {
            console.log('Next: ' + x);
        },
        function (err) {
            console.log('Error: ' + err);
        },
        function () {
            console.log('Completed');
        });
});

更新:按照马丁的回答,将调用回调。但是,我现在收到此错误:

Error: TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.

我已经更改了延迟函数以返回一个 Observable,如下所示:

var observable = Rx.Observable.defer(function () {
            return Rx.Observable.of(client.get(filmsIt[i], args, (filmData, filmResponse) => {
                console.log(filmData);
                return filmData;
            });
        }));

问题是,现在 zip 函数获取的不是 HTTP 调用的实际值,而是一堆 ClientRequest 对象:

  { '0': 
      ClientRequest {
        domain: null,
        _events: {},
        _eventsCount: 0,
        _maxListeners: undefined,
        href: 'https://swapi.co/api/films/7/',
        options: [Object],
        _httpRequest: [Object] },
     '1': 
      ClientRequest {
        domain: null,
        _events: {},
        _eventsCount: 0,
        _maxListeners: undefined,
        href: 'https://swapi.co/api/films/7/',
        options: [Object],
        _httpRequest: [Object] },
      ...

最佳答案

当使用Observable.defer时,你需要从回调函数返回一个Observable(或者Promise或者其他什么,参见http://reactivex.io/rxjs/class/es6/MiscJSDoc.js~ObservableInputDoc.html)。

我猜你想做这样的事情:

var observable = Rx.Observable.defer(function () {
    return client.get(filmsIt[i], args, (filmData, filmResponse) => {
        console.log(filmData);
        return filmData;
    });
});

关于node.js - RxJS - 使用 zip 并行化导致 HTTP 请求无法正常工作的延迟 Observables - 您在需要流的地方提供了无效的对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45671376/

相关文章:

node.js - 将 Mocha 中的对象与 MongoDb 中的日期进行比较

javascript - 在设置 RxJs 订阅后,是否有一种干净的方法可以立即执行函数?

javascript - Rxjs:如何仅在之前发生另一个事件时才订阅事件?

Angular/RxJS : How to generate a sequence of delayed values?

javascript - 如何从函数创建 Observable?

javascript - 这可以重构为示例中看到的更具可组合性的样式吗?

javascript - Gatsby worker 错误 :

node.js - 项目中依赖项的包管理器

Node.js - 简单的 Restify POST Mocha 测试失败

node.js - docker-compose POSTGRES & SEQUELIZE 的数据库连接错误