我想等待(阻塞)一个线程,直到某个时间过去或另一个流抽取一个值,我认为以下可能会实现这一点,但它会抛出异常,因为第一个流是空的,
// class level subject manipulated by another thread...
_updates = new Subject<Unit>();
...
// wait for up to 5 seconds before carrying on...
var result = Observable.Timer(DateTime.Now.AddSeconds(5))
.TakeUntil(_updates)
.Wait();
我怎样才能实现阻塞长达 5 秒或直到其他流抽取值的能力?
最佳答案
您可以像这样使用 Observable.Timeout
:
var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5)).Wait();
我使用 Take(1)
因为超时期望序列完成,而不仅仅是产生下一个值。超时时会抛出 System.TimeoutException
。
如果您不想要异常 - 您可以使用 Catch
来提供一些值:
var result = _updates.Take(1).Timeout(DateTime.Now.AddSeconds(5))
.Catch(Observable.Return(default(Unit))).Wait();
// should catch specific exception, not all
如果您的 Unit
确实是@Shlomo 提到的 rx 单元 - 您可以这样更改它:
var result = _updates.Select(c => (Unit?) c).Take(1)
.Timeout(DateTime.Now.AddSeconds(5)).Catch(Observable.Return((Unit?) null)).Wait();
或者像往常一样捕获异常。
关于c# - 如何使用 Reactive Extensions (Rx.Net) 等待一个值或直到经过一段固定的时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43217146/