我有一个 Rx.Observable.webSocket 主题。我的服务器端点无法处理同时接收的消息(<25 毫秒)。现在我需要一种方法来扩展我的 websocket 主题的 next() 调用。
我创建了另一个主题 requestSubject
并订阅了它。
然后在订阅内调用下一个 websocket。
requestSubject.delay(1000).subscribe((request) => {
console.log(`SENDING: ${JSON.stringify(request)}`);
socketServer.next(JSON.stringify(request));
});
使用延迟移动每个下一个调用相同的延迟时间,然后所有下一个调用稍后发出相同的时间......这不是我想要的。
我尝试了delay
、throttle
、debounce
,但都不合适。
下面应该说明我的问题
Stream 1 | ---1-------2-3-4-5---------6----
after some operation ...
Stream 2 | ---1-------2----3----4----5----6-
最佳答案
不得不修改一下,它并不像看起来那么容易:
//example source stream
const source = Rx.Observable.from([100,500,1500,1501,1502,1503])
.mergeMap(i => Rx.Observable.of(i).delay(i))
.share();
stretchEmissions(source, 1000)
.subscribe(val => console.log(val));
function stretchEmissions(source, spacingDelayMs) {
return source
.timestamp()
.scan((acc, curr) => {
// calculate delay needed to offset next emission
let delay = 0;
if (acc !== null) {
const timeDelta = curr.timestamp - acc.timestamp;
delay = timeDelta > spacingDelayMs ? 0 : (spacingDelayMs - timeDelta);
}
return {
timestamp: curr.timestamp,
delay: delay,
value: curr.value
};
}, null)
.mergeMap(i => Rx.Observable.of(i.value).delay(i.delay), undefined, 1);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>
基本上我们需要计算发射之间所需的延迟,以便我们可以将它们隔开。我们使用原始发射的 timestamp()
和并发为 1 的 mergeMap
重载来执行此操作,以便仅在前一个发射时订阅下一个延迟值。这是一种没有其他副作用的纯 Rx 解决方案。
关于javascript - RXJS 可观察拉伸(stretch),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45132579/