javascript - RXJS 可观察拉伸(stretch)

标签 javascript rxjs reactive-programming

我有一个 Rx.Observable.webSocket 主题。我的服务器端点无法处理同时接收的消息(<25 毫秒)。现在我需要一种方法来扩展我的 websocket 主题的 next() 调用。

我创建了另一个主题 requestSubject 并订阅了它。 然后在订阅内调用下一个 websocket。

requestSubject.delay(1000).subscribe((request) => {
  console.log(`SENDING: ${JSON.stringify(request)}`);
  socketServer.next(JSON.stringify(request));
});

使用延迟移动每个下一个调用相同的延迟时间,然后所有下一个调用稍后发出相同的时间......这不是我想要的。

我尝试了delaythrottledebounce,但都不合适。

下面应该说明我的问题

Stream 1 | ---1-------2-3-4-5---------6----

    after some operation ...

Stream 2 | ---1-------2----3----4----5----6-

最佳答案

不得不修改一下,它并不像看起来那么容易:

//example source stream
const source = Rx.Observable.from([100,500,1500,1501,1502,1503])
  .mergeMap(i => Rx.Observable.of(i).delay(i))
  .share();

stretchEmissions(source, 1000)
  .subscribe(val => console.log(val));

function stretchEmissions(source, spacingDelayMs) {
  return source
    .timestamp()
    .scan((acc, curr) => {
      // calculate delay needed to offset next emission
      let delay = 0;
      if (acc !== null) {
        const timeDelta = curr.timestamp - acc.timestamp;
        delay = timeDelta > spacingDelayMs ? 0 : (spacingDelayMs - timeDelta);
      }
  
      return {
        timestamp: curr.timestamp,
        delay: delay,
        value: curr.value
      };
    }, null)
    .mergeMap(i => Rx.Observable.of(i.value).delay(i.delay), undefined, 1);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>

基本上我们需要计算发射之间所需的延迟,以便我们可以将它们隔开。我们使用原始发射的 timestamp() 和并发为 1 的 mergeMap 重载来执行此操作,以便仅在前一个发射时订阅下一个延迟值。这是一种没有其他副作用的纯 Rx 解决方案。

关于javascript - RXJS 可观察拉伸(stretch),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45132579/

相关文章:

javascript - 如何在不替换节点属性的情况下替换文本

javascript - typescript 定义 - 嵌套函数

javascript - 单击菜单项时将类添加到 div

java - publishOn 和并行的区别

stream - RxJS 可以以基于拉动的方式使用吗?

javascript - 我做错了什么(Discord 上的简单播音员机器人)

angular - 使用异步管道轮询 Observables?

javascript - RxJS - 哪个运算符可以结合最近 N 个事件的历史?

javascript - 在 RxJS 中以 block 的形式运行一组可观察对象

apache-kafka - 如何在 Webflux 应用程序中制作 Spring Cloud Stream 消费者?