c# - 如何使用 Reactive Extensions (Rx.Net) 等待一个值或直到经过一段固定的时间

标签 c# system.reactive

我想等待(阻塞)一个线程,直到某个时间过去或另一个流抽取一个值,我认为以下可能会实现这一点,但它会抛出异常,因为第一个流是空的,

 // class level subject manipulated by another thread...
 _updates = new Subject<Unit>();
 ...
 // wait for up to 5 seconds before carrying on...    
 var result = Observable.Timer(DateTime.Now.AddSeconds(5))
    .TakeUntil(_updates)
    .Wait();

我怎样才能实现阻塞长达 5 秒或直到其他流抽取值的能力?

最佳答案

您可以像这样使用 Observable.Timeout:

 var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5)).Wait();

我使用 Take(1) 因为超时期望序列完成,而不仅仅是产生下一个值。超时时会抛出 System.TimeoutException

如果您不想要异常 - 您可以使用 Catch 来提供一些值:

var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5))
    .Catch(Observable.Return(default(Unit))).Wait();
// should catch specific exception, not all

如果您的 Unit 确实是@Shlomo 提到的 rx 单元 - 您可以这样更改它:

var result = _updates.Select(c => (Unit?) c).Take(1)
    .Timeout(DateTime.Now.AddSeconds(5)).Catch(Observable.Return((Unit?) null)).Wait();

或者像往常一样捕获异常。

关于c# - 如何使用 Reactive Extensions (Rx.Net) 等待一个值或直到经过一段固定的时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43217146/

相关文章:

c# - 在多个页面上重复使用 WPF 方法 - 我应该使用静态类还是单例?

c# - 如何获取 WPF 中的内置路由事件列表

c# - 如何为响应式扩展制作自定义扩展

c# - 有没有办法在方法之间进行数学加法?

c# - 确定 ListView 中点击的列

c# - 带有 HiveMQ 的 M2Mqtt 库

c# - c# 中是否已有条件 Zip 函数?

c# - 响应式扩展 .MaxBy

mvvm - 你会为 Rx.Observable 创建一个服务吗?

c# - 我如何确定 IObservable<T> 上有多少/明确的订阅者?