promise - RxJs 缓冲直到数据库插入( promise )

标签 promise rxjs buffer rxjs5

我有一个数据流,数据快速传入。我想通过保持顺序将它们插入数据库。我有一个数据库,它返回一个 promise ,当插入成功时该 promise 将得到解决。

我想创建一个 Rx 流,它缓冲新数据,直到插入缓冲的数据。

我怎样才能做到这一点?

最佳答案

我相信要获得您想要的东西,您需要创建自己的运算符。稍微脱离 RxJS 你可以得到类似的东西(警告,尚未测试)...

export class BusyBuffer<T> {
  private itemQueue = new Subject<T>();
  private bufferTrigger = new Subject<{}>();
  private busy = false;

  constructor(consumerCallback: (items: T[]) => Promise<void>) {
    this.itemQueue.buffer(this.bufferTrigger).subscribe(items => {
      this.busy = true;
      consumerCallback(items).then(() => {
        this.busy = false;
        this.bufferTrigger.next(null);
      });
    });
  }

  submitItem(item: T) {
    this.itemQueue.next(item);
    if(!busy) {
      this.bufferTrigger.next(null);
    }
  }

}

然后可以用作

let busyBuffer = new BusyBuffer<T>(items => {
  return database.insertRecords(items);
});
items.subscribe(item => busyBuffer.submitItem(item));

但这并不完全是纯粹的响应式(Reactive),有人也许能够想出更好的东西。

关于promise - RxJs 缓冲直到数据库插入( promise ),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46828884/

相关文章:

javascript - 处理 promise

javascript - Angularjs:如何编写 $http 作为 Controller 的 promise

javascript - 我怎么能中止 JavaScript Promise?

angular - rxjs 主题 switchmap http 错误

javascript - 从 redux-observable 史诗中访问状态

node.js - MongoDB 仅使用中间件插入 UUID?

javascript - 如何将两个单独的查询传递给 promise 链末尾的回调函数?

javascript - 在第一个可观察对象发出后,在第二个可观察对象的第一次发出时发出

javascript - Nodejs Javascript TypedArray 缓冲到字符串并再次返回

javascript - 将字符串作为 Uint8 缓冲区发送