rxjs - rxjs 中第一次发射的 BufferCount(2, 1)

标签 rxjs observable reactive-programming

我有一个可观察的:

1----2----3----3----2----1-|

我用

.bufferCount(2, 1)

输出是

--[1,2]-[2,3]-[3,3]-[3,2]-[2,1]-|

然后我链

.filter(twoEmissions => twoEmissions[0] !== twoEmissions[1])

这使得:

 --[1,2]-[2,3]-------[3,2]-[2,1]-|

然后我链

.map(twoEmissions => twoEmissions[1])

只留下最新的排放量,所以我的最终输出是:

-----2-----3------------2----1---|

我的问题是在这种情况下省略了第一次发射。 我尝试使用 buffer() 代替 bufferCount() 和以下 openingNotifier:

bufferClosingNotifier = sourceObservable$
  .scan((acc, val, index) => (index), 0)
  .filter((index: number) => index === 0 || index > 0 && index % 2 !== 0)

它发出第 1 次、第 3 次、第 5 次、第 7 次...发射,但它们不重叠。

我怎样才能有 bufferCount(2, 1) 但无论如何都会发出源可观察的第一次发射?

最佳答案

如果您的要求如下:

给定一个值流:

1----2----3----3----2----1-|

我只想要后续的唯一值,以便结束流

1----2----3---------2----1-|

您可以使用.distinctUntilChanged()获得此行为:

public distinctUntilChanged(compare: function): Observable source

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item.

If a comparator function is provided, then it will be called for each item to test for whether or not that value should be emitted.

If a comparator function is not provided, an equality check is used by default.

Rx.Observable.from([1,2,3,3,2,1])
  .distinctUntilChanged()
  .subscribe(val => console.log(val))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.js"></script>

关于rxjs - rxjs 中第一次发射的 BufferCount(2, 1),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47922243/

相关文章:

flutter - 抖动错误:未为 'combineLatest4'类型定义 'Observable'方法

ios - 试图了解 ReactiveCocoa

Angular RxJs : Poll HTTP request until timeout or positive response from server

redux - NGRX/Effects 将数据传递给 catchError

javascript - 仅在特定条件下触发 RXJS Stream

java - FlatMap 调用后,Reactor 不支持 runOn

java - Observable.just(doSomeLongStuff()) 在订阅 observable 之前运行 doSomeLongStuff()

javascript - 如何在 rxjs@5 中制作异步管道?

typescript - RxJS:确保定义了可选对象属性

angular - Observable<any> 不可分配给类型 Observable<any>