c# - 当任何值由另一个 Observable 产生时,选择 Observable 的最新值

标签 c# system.reactive

我有一个从常规 .NET 事件生成的 Observable。 Observable 是热的——不是暖的——从某种意义上说,它甚至在任何订阅之前就开始产生值(value),并且每次有人订阅时,它都会收到最新产生的值(value)。我们将其命名为 eventStream

然后我有另一个 Observable,由另一个类公开,它表示一些状态流,因此每个新值都给出该类管理的某些事物的当前状态。这个 Observable 也很热门。我们将其命名为 stateStream

每次事件序列产生一个新值时,我都想选择(我会说样本,但这可能会导致混淆)状态序列提供的最新值。这应该产生一个新的序列,组合两个值,然后处理它们,等等。

这是我想出来的,但它似乎不起作用:

var eventStream = Observable.FromEventPattern<MyEventArgs>(/*...*/);
var stateStream = someDependency.SomeStateStream;

eventStream.Select(eventValue => 
  stateStream
    .Take(1)
    .Select(stateValue => new { Event = eventValue, State = stateValue }))
  .Switch()
  .Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
  .Subscribe(value => /* do something */);

这背后的基本原理取 self 工作过的其他类似场景,其中某个源产生的新值导致新订阅运行,因此返回一个 Observable,最后返回 IObservable<IObservable<...>>使用 Switch() 或一些类似的运算符再次被压缩成一个一维 IObservable。
但是在这种情况下,从快速测试来看,似乎没有新的订阅,并且只产生了第一个 stateStream 值。相反,我想在每次 Take(1) 触发 时选择第一个值 (eventStream)。

据我所知,CombineLatestZip 不符合要求:每当两个序列之一提供新值时,CombineLatest 就会触发;每次两个序列都有可用的新值时,Zip 就会触发,通常这意味着当两个序列中最慢的一个有值时。出于与 And/Then/When 相同的原因,Zip 也不应该是正确的。

我还检查了 SO thread combining one observable with latest from another observable ,但我认为这不适用于此处。仅在我阅读的一条评论中

[...] and then Scan acts like a CombineLatest that filters for notifications from only one side

不知何故,这听起来很熟悉,但我无法理解。

最佳答案

我想你想要 Observable.Sample()

stateSource.Sample(eventSource)
     .Zip(eventSource,...)

关于c# - 当任何值由另一个 Observable 产生时,选择 Observable 的最新值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31124824/

相关文章:

c# - 使 System.Reactive 重复新订阅的最后 X 项

c# - 如何在 Visual Studio 中使用堆栈跟踪进行调试?

c# - OpenXml:工作表子元素的顺序更改导致文件损坏

android - 动态组合多个 Retrofit Observable

c# - subject.OnError 抛出

c# - SelectMany 使用 ReactiveExtensions 占用大量内存

c# - 大理石图

c# - 如何在 MVC3 的 View 文件中使用多 FOR 循环?

C# 打开 Excel 模板,复制行格式并将新数据插入到每个新行中

c# - 将 PropertyChangedCallback 标记为异步是否安全?