rxjs - 如何使用 rxjs 5 创建一个 pausableBuffer

标签 rxjs rxjs5

我正在尝试制作我认为的 pausable buffer

我有人为此分享了他们的代码,但我不知道如何将其变成自定义操作(没有 typescript /只有 ES6.

const attach = Rx.Observable.timer(0 * 1000, 8 * 1000).mapTo('@');
const detach = Rx.Observable.timer(4 * 1000, 8 * 1000).mapTo('#');

const input = Rx.Observable.interval(1* 1000);
const pauser = attach.mapTo(true).merge(detach.mapTo(false));

input
  .publish(_input => _input
    .combineLatest(pauser, (v, b) => b)
    .filter(e => e)
    .publish(_switch => _input.bufferWhen(() => _switch.take(1)))
  )
  .flatMap(e => Rx.Observable.from(e))
  .concatMap(e => Rx.Observable.empty().delay(150).startWith(e))

有人可以帮我创建它,这样我就可以做到 input.pausableBuffer(pauser) (也许定义一个startsWith)。

最佳答案

您可以像这样将其添加到原型(prototype)中:

var pausableBuffer = function(pauser) {
  return this.publish(_input => _input
    .combineLatest(pauser, (v, b) => b)
    .filter(e => e)
    .publish(_switch => _input.bufferWhen(() => _switch.take(1)))
  )
  .flatMap(e => Rx.Observable.from(e));
}

Rx.Observable.prototype.pausableBuffer = pausableBuffer;

要记住的一件事是,这将从暂停状态开始。要改为以事件状态启动它,请添加 .startWith(true)pauser .
var pausableBuffer = function(pauser) {
  return this.publish(_input => _input
    .combineLatest(pauser.startWith(true), (v, b) => b)
    .filter(e => e)
    .publish(_switch => _input.bufferWhen(() => _switch.take(1)))
  )
  .flatMap(e => Rx.Observable.from(e));
}

Rx.Observable.prototype.pausableBuffer = pausableBuffer;

2019 年更新:RxJs 6 风格:
var pausableBuffer = function(pauser) {
  return (source) => source.pipe(publish(_input => 
  combineLatest(_input, pauser.pipe(startWith(true))).pipe(
    map(([inp, pa]) => pa),
    filter(pa => pa),
    publish(_switch => _input.pipe(bufferWhen(() => _switch.pipe(take(1)))))
  )),
    mergeMap(e => from(e))
  );
}

Demo

关于rxjs - 如何使用 rxjs 5 创建一个 pausableBuffer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40557715/

相关文章:

drag-and-drop - RXJS 拖放

Angular 2 - 来自 Observable 的排序列表

rxjs - 共享 observable 和 startWith 运算符

Angular2 如何更新可观察集合中的项目

angular - 捕获和处理Angular中的后端错误

rxjs - RxJS 中的 tap 和 map 有什么区别?

RxJS5 - 等待 2 个 http 调用,然后监听套接字

javascript - rxjs 冷 obs 存储所有消息

javascript - "Unsubscribe"函数回调/Observable 中的钩子(Hook) "executor"函数

angular - rxjs 的条件发射延迟