rxjs - zip 的替代方案,每当任何可观察值发出值时都会产生值

标签 rxjs

目前,只有当所有压缩的可观察量产生一个值时,zip 才会产生一个值。例如。来自文档:

Merges the specified observable sequences or Promises into one observable sequence by using the selector function whenever all of the observable sequences have produced an element

我正在寻找一个可观察量,它可以对可观察量进行压缩,但会生成压缩可观察量的序列数组,其中如果全部产生一个值并不重要..

例如可以说我有tick$、observ1、observ2..tick$总是每x秒产生值..而observ1和observ2仅不时产生.. 我期望我的流看起来像

[tick, undefined, observ2Res],
[tick, undefined, undefined],
[tick, observ1Res, observ2Res]
...
...

它不是合并最新的,因为合并最新获取给定可观察量的最新值。

最佳答案

我相信buffer(或者sample)可能会让您走上正确的道路。 buffer 方法接受一个用于定义缓冲区边界的 Observable。生成的流会发出在该窗口中发出的任何项目(例如从 RXJS 文档中窃取的 buffer):

var source = Rx.Observable.timer(0, 50)
  .buffer(function () { return Rx.Observable.timer(125); })
  .take(3);

var subscription = source.subscribe(x => console.log('Next: ', x));

// => Next: 0,1,2
// => Next: 3,4,5
// => Next: 6,7

因此,我们现在有一种方法可以获取特定时间窗口内流的所有发出事件。在您的情况下,我们可以使用 tick$ 来描述我们的采样周期,而 observ1observ2 是我们想要缓冲的底层流:

const buffered1 = observ1.buffer(tick$);
const buffered2 = observ2.buffer(tick$);

每个流都会在每个tick$周期内发射一次,并将从底层流(在该周期内)发射所有发射项目的列表。 缓冲流将发出如下数据:

|--[]--[]--[1, 2, 3]--[]-->

为了获得您想要的输出,我们可以选择仅查看每个缓冲结果的最新发出的项目,如果没有发出的数据,我们可以传递 null:

const buffered1 = observ1.buffer($tick).map(latest);
const buffered2 = observ2.buffer($tick).map(latest);

function latest(x) {
    return x.length === 0 ? null : x[x.length - 1];
}

我之前演示的示例流现在看起来像这样:

|--null--null--3--null-->

最后,我们可以zip这两个流来获取tick$间隔期间“最新”发出的数据:

const sampled$ = buffered1.zip(buffered2);

sampled$流将通过tick$从我们的observ1observ2流发出最新数据 window 。这是示例结果:

|--[null, null]--[null, 1]--[1, 2]-->

关于rxjs - zip 的替代方案,每当任何可观察值发出值时都会产生值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35897943/

相关文章:

angular - 异步管道在 Angular 的输入中不起作用

javascript - 这可以重构为示例中看到的更具可组合性的样式吗?

Angular 2 : Chain http requests with concat()

redux - 从 redux-observable 中调度多个 Action

angular - 分派(dispatch)新操作时,如何避免通过循环存储对象而重新渲染组件

javascript - RxJS 分组发出的事件 nodejs

rxjs - 使用内部可观察谓词的管道过滤器

javascript - 通过 forkJoin 和响应识别的并行 http 请求

javascript - Rxjs:使用 bufferCount 时如何获取最后一个值

javascript - Angular TS 错误 TS1109