rxjs - 去抖动和缓冲 rxjs 订阅

标签 rxjs rxjs5

我有一个消息队列处理器,可以将消息提供给服务......

q.on("message", (m) => {
  service.create(m)
    .then(() => m.ack())
    .catch(() => n.nack())
})
该服务使用 RxJS Observable 和订阅 debounceTime()那些请求。
class Service {
  constructor() {
    this.subject = new Subject()
    this.subject.debounceTime(1000)
      .subscribe(({ req, resolve, reject }) =>
        someOtherService.doWork(req)
          .then(() => resolve())
          .catch(() => reject())
      )
  }

  create(req) {
    return new Promise((resolve, reject) =>
      this.subject.next({
        req,
        resolve,
        reject
      })
    )
  }
}
问题是只有去抖动的请求才会得到 ackd/nackd。如何确保订阅也解决/拒绝其他请求? bufferTime()让我参与其中,但它不会将每次调用的超时持续时间重置为 next() .

最佳答案

对于那些正在寻找 RXJS 6 解决方案的人,我创建了一个自定义运算符,其行为类似于 debounce() + buffer()如上一个答案。
我叫它bufferDebounce带有类型推断的 Typescript 中的片段在这里:

import { Observable, OperatorFunction } from 'rxjs'
import { buffer, debounceTime } from 'rxjs/operators'

type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
const bufferDebounce: BufferDebounce = debounce => source =>
  new Observable(observer =>
    source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
      next(x) {
        observer.next(x);
      },
      error(err) {
        observer.error(err);
      },
      complete() {
        observer.complete();
      },
    }),
  );
您可以在此示例中测试其行为以检查这是否适合您 https://stackblitz.com/edit/rxjs6-buffer-debounce

关于rxjs - 去抖动和缓冲 rxjs 订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50515357/

相关文章:

javascript - "Unsubscribe"函数回调/Observable 中的钩子(Hook) "executor"函数

angular - 在 Angular 中运行 dist 文件夹是否需要 node_modules 和 package.json

angular - 如何在不需要 rxjs-compat 的情况下只导入 RxJS 6 中使用的运算符,如旧的 RxJS?

javascript - RxJs 映射后未定义的值

typescript - RXJS 控制可观察调用

timeout - rxjs 创建可观察超时总是错误

javascript - 为什么 Angular 2 的 CanActivate 返回 Observable<boolean> 而不是 Promise?

http - Angular2 HTTP 使用 observables 订阅显示数据未定义

angular - 与其他组件或服务共享可观察量

javascript - 为什么bindCallback不是一个函数?