目前,只有当所有压缩的可观察量产生一个值时,zip 才会产生一个值。例如。来自文档:
Merges the specified observable sequences or Promises into one observable sequence by using the selector function whenever all of the observable sequences have produced an element
我正在寻找一个可观察量,它可以对可观察量进行压缩,但会生成压缩可观察量的序列数组,其中如果全部产生一个值并不重要..
例如可以说我有tick$、observ1、observ2..tick$总是每x秒产生值..而observ1和observ2仅不时产生.. 我期望我的流看起来像
[tick, undefined, observ2Res],
[tick, undefined, undefined],
[tick, observ1Res, observ2Res]
...
...
它不是合并最新的,因为合并最新获取给定可观察量的最新值。
最佳答案
我相信buffer
(或者sample
)可能会让您走上正确的道路。 buffer
方法接受一个用于定义缓冲区边界的 Observable。生成的流会发出在该窗口中发出的任何项目(例如从 RXJS 文档中窃取的 buffer
):
var source = Rx.Observable.timer(0, 50)
.buffer(function () { return Rx.Observable.timer(125); })
.take(3);
var subscription = source.subscribe(x => console.log('Next: ', x));
// => Next: 0,1,2
// => Next: 3,4,5
// => Next: 6,7
因此,我们现在有一种方法可以获取特定时间窗口内流的所有发出事件。在您的情况下,我们可以使用 tick$
来描述我们的采样周期,而 observ1
和 observ2
是我们想要缓冲的底层流:
const buffered1 = observ1.buffer(tick$);
const buffered2 = observ2.buffer(tick$);
每个流都会在每个tick$周期内发射一次,并将从底层流(在该周期内)发射所有发射项目的列表。 缓冲
流将发出如下数据:
|--[]--[]--[1, 2, 3]--[]-->
为了获得您想要的输出,我们可以选择仅查看每个缓冲结果的最新发出的项目,如果没有发出的数据,我们可以传递 null:
const buffered1 = observ1.buffer($tick).map(latest);
const buffered2 = observ2.buffer($tick).map(latest);
function latest(x) {
return x.length === 0 ? null : x[x.length - 1];
}
我之前演示的示例流现在看起来像这样:
|--null--null--3--null-->
最后,我们可以zip
这两个流来获取tick$
间隔期间“最新”发出的数据:
const sampled$ = buffered1.zip(buffered2);
此sampled$
流将通过tick$
从我们的observ1
和observ2
流发出最新数据 window 。这是示例结果:
|--[null, null]--[null, 1]--[1, 2]-->
关于rxjs - zip 的替代方案,每当任何可观察值发出值时都会产生值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35897943/