我有一个从常规 .NET 事件生成的 Observable。 Observable 是热的——不是暖的——从某种意义上说,它甚至在任何订阅之前就开始产生值(value),并且每次有人订阅时,它都会收到最新产生的值(value)。我们将其命名为 eventStream
。
然后我有另一个 Observable,由另一个类公开,它表示一些状态流,因此每个新值都给出该类管理的某些事物的当前状态。这个 Observable 也很热门。我们将其命名为 stateStream
。
每次事件序列产生一个新值时,我都想选择(我会说样本,但这可能会导致混淆)状态序列提供的最新值。这应该产生一个新的序列,组合两个值,然后处理它们,等等。
这是我想出来的,但它似乎不起作用:
var eventStream = Observable.FromEventPattern<MyEventArgs>(/*...*/);
var stateStream = someDependency.SomeStateStream;
eventStream.Select(eventValue =>
stateStream
.Take(1)
.Select(stateValue => new { Event = eventValue, State = stateValue }))
.Switch()
.Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
.Subscribe(value => /* do something */);
这背后的基本原理取 self 工作过的其他类似场景,其中某个源产生的新值导致新订阅运行,因此返回一个新 Observable,最后返回 IObservable<IObservable<...>>
使用 Switch()
或一些类似的运算符再次被压缩成一个一维 IObservable。
但是在这种情况下,从快速测试来看,似乎没有新的订阅,并且只产生了第一个 stateStream
值。相反,我想在每次 Take(1)
触发 时选择第一个值 (eventStream
)。
据我所知,CombineLatest
和 Zip
不符合要求:每当两个序列之一提供新值时,CombineLatest
就会触发;每次两个序列都有可用的新值时,Zip
就会触发,通常这意味着当两个序列中最慢的一个有值时。出于与 And/Then/When
相同的原因,Zip
也不应该是正确的。
我还检查了 SO thread combining one observable with latest from another observable ,但我认为这不适用于此处。仅在我阅读的一条评论中
[...] and then Scan acts like a CombineLatest that filters for notifications from only one side
不知何故,这听起来很熟悉,但我无法理解。
最佳答案
我想你想要 Observable.Sample()
stateSource.Sample(eventSource)
.Zip(eventSource,...)
关于c# - 当任何值由另一个 Observable 产生时,选择 Observable 的最新值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31124824/