javascript - RxJS 连接/合并异步请求超时

标签 javascript node.js rxjs rxjs5

我想让 RxJS 可观察,这将同时发出异步请求和超时。
-> promise -> filter -> promise and timeout -> complete
在具有异步/等待代码的同步版本中,代码如下所示:

(async () => {
  const gameInfo = await function1(); // return undefined or {start, id}

   if (!gameInfo) {
     return;
   }

  const gameStart = gameInfo.start;

  setTimeout(() => {
     function3(gameInfo._id).then(res => {
         console.log('Done');
     });
  }, getDelayTime(gameStart));

 await function2();

 // Completed
})();

const getDelayTime = (time) => {
  const now = new Date();
  const parsedTime = new Date(time);

  console.log('In delay', parsedTime.getTime() - now.getTime());
  return parsedTime.getTime() - now.getTime();
};

所以目标是从function1()异步接收一些数据,然后对它的第一部分进行一些操作,同时启动定时器,在时间结束时进行第二次操作,而不是不管 function3() 是否结束。


所以,在 RxJS 中,我带来了这样的东西:

Rx.Observable
  .fromPromise(function1())
  .filter(data => data)
  .concatMap(data => {
    Rx.Observable.fromPromise(function2());
    Rx.operators.delay(data => Rx.operators.timeout(getDelayTime(data.start)));
  })
  .subscribe(
    data => {
       console.log('Data', data); 
       function3(data._id)
    }
    err => console.error('Err', err.message),
    () => console.log('Completed'),
  );

但是,对于我的目标而言,它并没有像预期的那样工作。

如果我把我的代码改成这样:

Rx.Observable
  .fromPromise(function1())
  .filter(data => data)
  .delayWhen(data => getDelayTime(data.start)) // or .timeout() or .delay
  .map(data => {Rx.Observable.fromPromise(function2());})

它不起作用。
所以,我的问题是,如何在 RxJS 中进行并发计时器和异步 http 请求

最佳答案

延迟调用的一种方法是执行 timer,然后执行 *map(flatMap、switchMap、...)运算符。它将等待持续时间,然后订阅您放入 *map 中的任何内容。

您可以使用 combineLatestwithLatestFrom 等类似方式订阅多个可观察对象。但通常人们想对结果做些什么。

如果您不关心结果,您总是可以使用 tap 运算符来触发副作用。或者您可以在订阅的 onNext 回调中执行此操作。

我认为这可以满足您的需求,但老实说这是一种奇怪的用法。如果您可以分享更多关于您正在努力完成的事情,我们可能会提供更好的建议。

function test(id, timeout) {
  console.log('called', id);
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      console.log('resolved', id);
      resolve();
    }, timeout);
  });
}

Rx.Observable.fromPromise(test(1, 1000))
  .flatMap(() => Rx.Observable.combineLatest(
    Rx.Observable.fromPromise(test(2, 3000)),
    Rx.Observable.timer(1234).flatMap(() => 
      Rx.Observable.fromPromise(test(3, 100)))
  ))
  .subscribe(() => { console.log('onNext'); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.9/Rx.min.js"></script>

关于javascript - RxJS 连接/合并异步请求超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49795527/

相关文章:

javascript - POST 请求未将文件上传到服务器

javascript - RxJs:http.get 正在获取一个数组,但我希望订阅者获取单个元素

javascript - 使用node.js和express.js的图片显示错误

rxjs 不发出直到值不为空

javascript - RXJS retryWhen 重置等待间隔

javascript - jQuery - 像 Windows 8 开始屏幕一样滚动时动画背景图像的可能插件或方法?

javascript - getDownloadURL 无法正常工作 - Firebase 存储 (Web)

javascript - 为什么我的 html5 canvas 动画如此闪烁?

javascript - 如何在 Ionic 3 中导入外部 JS 文件

javascript - 更新 Mongoose 中的文档(更新特定索引处的数组属性)