我想使用向服务器发出一系列请求,但服务器有每秒 10 个请求的硬速率限制。如果我尝试循环发出请求,它将达到速率限制,因为所有请求将同时发生。
for(let i = 0; i < 20; i++) {
sendRequest();
}
ReactiveX 有很多修改可观察流的工具,但我似乎找不到实现速率限制的工具。我尝试添加标准延迟,但请求仍然同时触发,只比之前晚了 100 毫秒。
const queueRequest$ = new Rx.Subject<number>();
queueRequest$
.delay(100)
.subscribe(queueData => {
console.log(queueData);
});
const queueRequest = (id) => queueRequest$.next(id);
function fire20Requests() {
for (let i=0; i<20; i++) {
queueRequest(i);
}
}
fire20Requests();
setTimeout(fire20Requests, 1000);
setTimeout(fire20Requests, 5000);
debounceTime
和 throttleTime
运算符也与我正在寻找的类似,但它是有损的而不是无损的。我想保留我提出的每个请求,而不是丢弃较早的请求。
...
queueRequest$
.debounceTime(100)
.subscribe(queueData => {
sendRequest();
});
...
如何在不超过使用 ReactiveX 和 Observables 的速率限制的情况下向服务器发出这些请求?
最佳答案
OP 中的实现 self answer (在 linked blog 中)总是会造成不理想的延迟。
如果限速服务允许每秒 10 个请求,则应该可以在 10 毫秒内发出 10 个请求,只要在另外 990 毫秒内没有发出下一个请求。
下面的实现应用了可变延迟以确保限制得到执行,并且延迟仅应用于会看到超出限制的请求。
function rateLimit(source, count, period) {
return source
.scan((records, value) => {
const now = Date.now();
const since = now - period;
// Keep a record of all values received within the last period.
records = records.filter((record) => record.until > since);
if (records.length >= count) {
// until is the time until which the value should be delayed.
const firstRecord = records[0];
const lastRecord = records[records.length - 1];
const until = firstRecord.until + (period * Math.floor(records.length / count));
// concatMap is used below to guarantee the values are emitted
// in the same order in which they are received, so the delays
// are cumulative. That means the actual delay is the difference
// between the until times.
records.push({
delay: (lastRecord.until < now) ?
(until - now) :
(until - lastRecord.until),
until,
value
});
} else {
records.push({
delay: 0,
until: now,
value
});
}
return records;
}, [])
.concatMap((records) => {
const lastRecord = records[records.length - 1];
const observable = Rx.Observable.of(lastRecord.value);
return lastRecord.delay ? observable.delay(lastRecord.delay) : observable;
});
}
const start = Date.now();
rateLimit(
Rx.Observable.range(1, 30),
10,
1000
).subscribe((value) => console.log(`${value} at T+${Date.now() - start}`));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
关于javascript - 如何使用 RxJS 5 无损地对限制请求进行评级,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42284020/