javascript - 来自 RxJS 请求流的同步响应流

标签 javascript reactive-programming rxjs

我是 RxJS 的新手,想知道是否有人可以帮助我。

我想从请求流(有效负载数据)创建同步响应流(最好是使用相应的请求)。

我基本上希望请求一个一个发送,每个等待最后一个的响应。

我试过了,但它会立即发送所有内容(jsbin):

var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);

responseStream = requestStream.flatMap(
  sendRequest,
  (val, response)=>{ return {val, response}; }
);

responseStream.subscribe(
  item=>{
    console.log(item);
  },
  err => {
    console.err(err);
  },
  ()=>{
    console.log('Done');
  }
);

function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('result for '+val);},1000);
  });
};

以下在一定程度上有效,但不对请求数据使用流 ( jsbin )。

var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
  var sendNext = function(){
    var val = data.shift();
    if (!val) {
      observer.onCompleted();
      return;
    }
    sendRequest(val).then(response=>{
      observer.onNext({val, response});
      sendNext();
    });
  };
  sendNext();
});

responseStream.subscribe(
  item=>{
    console.log(item);
  },
  err => {
    console.err(err);
  },
  ()=>{
    console.log('Done');
  }
);

function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
  });
};

谢谢!

编辑:

澄清一下,这就是我想要实现的目标:

“发送 A,当您收到 A 的响应时,发送 B,当您收到 B 的响应时,发送 C,等等……”

按照 user3743222 的建议,使用 concatMap 和 defer 似乎可以做到这一点(jsbin):

responseStream = requestStream.concatMap(
  (val)=>{
    return Rx.Observable.defer(()=>{
      return sendRequest(val);
    });
  },
  (val, response)=>{ return {val, response}; }
);

最佳答案

尝试在您的第一个代码示例中将 flatMap 替换为 concatMap,如果结果行为符合您的要求,请告诉我。

responseStream = requestStream.concatMap(//I replaced `flatMap`
  sendRequest,
  (val, response)=>{ return {val, response}; }
);

基本上 concatMapflatMap 具有相似的签名,行为上的区别在于它将等待当前可观察对象被展平完成,然后再继续下一个。所以在这里:

  • requestStream 值将被推送到 concatMap 运算符。
  • concatMap 运算符将生成一个 sendRequest 可观察对象,以及该可观察对象之外的任何值(似乎是一个元组 (val, response)) 将通过选择器函数传递,其对象结果将传递到下游
  • sendRequest 完成时,将处理另一个 requestStream 值。
  • 简而言之,您的请求将被一一处理

或者,您可能想使用defer 来推迟sendRequest 的执行。

responseStream = requestStream.concatMap(//I replaced `flatMap`
  function(x){return Rx.Observable.defer(function(){return sendRequest(x);})},
  (val, response)=>{ return {val, response}; }
);

关于javascript - 来自 RxJS 请求流的同步响应流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35187048/

相关文章:

javascript - Phaser.js 拖放问题

angular - 自定义错误处理程序中的RxJS传递函数

angular - 使用 RxJS forkJoin 时传递参数的最佳方法

javascript - Vue组件中的"You may have an infinite update loop in a component render function"警告

javascript - cordova 中的 navigator.geolocation.getCurrentPosition 仅提供 10 米的精度

javascript - 出错后继续订阅

stream - Apache Kafka Streams 将 KTables 物化为主题似乎很慢

scala - 玩mongo枚举器意外停止

javascript - 将修改后的数组发布到 Observable

javascript - 如何使用angularjs在html中打印数组对象数组