javascript - 如何使用 RxJS 5 无损地对限制请求进行评级

标签 javascript rxjs reactive-programming

我想使用向服务器发出一系列请求,但服务器有每秒 10 个请求的硬速率限制。如果我尝试循环发出请求,它将达到速率限制,因为所有请求将同时发生。

for(let i = 0; i < 20; i++) {
  sendRequest();
}

ReactiveX 有很多修改可观察流的工具,但我似乎找不到实现速率限制的工具。我尝试添加标准延迟,但请求仍然同时触发,只比之前晚了 100 毫秒。

const queueRequest$ = new Rx.Subject<number>();

queueRequest$
  .delay(100)
  .subscribe(queueData => {
    console.log(queueData);
  });

const queueRequest = (id) => queueRequest$.next(id);

function fire20Requests() {
  for (let i=0; i<20; i++) {
    queueRequest(i);
  }
}

fire20Requests();
setTimeout(fire20Requests, 1000);
setTimeout(fire20Requests, 5000);

debounceTimethrottleTime 运算符也与我正在寻找的类似,但它是有损的而不是无损的。我想保留我提出的每个请求,而不是丢弃较早的请求。

...
queueRequest$
  .debounceTime(100)
  .subscribe(queueData => {
    sendRequest();
  });
...

如何在不超过使用 ReactiveX 和 Observables 的速率限制的情况下向服务器发出这些请求?

最佳答案

OP 中的实现 self answer (在 linked blog 中)总是会造成不理想的延迟。

如果限速服务允许每秒 10 个请求,则应该可以在 10 毫秒内发出 10 个请求,只要在另外 990 毫秒内没有发出下一个请求。

下面的实现应用了可变延迟以确保限制得到执行,并且延迟仅应用于会看到超出限制的请求。

function rateLimit(source, count, period) {

  return source
    .scan((records, value) => {

      const now = Date.now();
      const since = now - period;

      // Keep a record of all values received within the last period.

      records = records.filter((record) => record.until > since);
      if (records.length >= count) {

        // until is the time until which the value should be delayed.

        const firstRecord = records[0];
        const lastRecord = records[records.length - 1];
        const until = firstRecord.until + (period * Math.floor(records.length / count));

        // concatMap is used below to guarantee the values are emitted
        // in the same order in which they are received, so the delays
        // are cumulative. That means the actual delay is the difference
        // between the until times.

        records.push({
          delay: (lastRecord.until < now) ?
            (until - now) :
            (until - lastRecord.until),
          until,
          value
        });
      } else {
        records.push({
          delay: 0,
          until: now,
          value
        });
      }
      return records;

    }, [])
    .concatMap((records) => {

      const lastRecord = records[records.length - 1];
      const observable = Rx.Observable.of(lastRecord.value);
      return lastRecord.delay ? observable.delay(lastRecord.delay) : observable;
    });
}

const start = Date.now();
rateLimit(
  Rx.Observable.range(1, 30),
  10,
  1000
).subscribe((value) => console.log(`${value} at T+${Date.now() - start}`));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

关于javascript - 如何使用 RxJS 5 无损地对限制请求进行评级,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42284020/

相关文章:

javascript - 在页面上完全绘制后猫头鹰轮播的回调

javascript - 用于分页的 RXJS while 循环

java - Flux的onComplete执行后如何返回Mono?

ios - 如何将 ReactiveCocoa 与手势识别器一起使用

javascript - DropzoneJS XHR 发送选项请求

javascript - 无法让 "Draggable"工作

javascript - 获取 innerHTML 值 - 仅纯文本

javascript - Angular 返回未定义

typescript - Angular2 Http with RXJS Observable TypeError : this. http.get(...).map(...).catch 不是函数

reactive-programming - RXJS : Idiomatic way to create an observable stream from a paged interface