c# - 接收 : Wait for first item for a period of time

标签 c# task-parallel-library system.reactive

我想将我遗留的基于事件的方法转换为基于可观察的方法,但我对 Rx 很陌生,所以我现在被卡住了。

我有一个事件源,现在是可观察的。在某个时间点,我必须启动一个方法,该方法通过返回行中的下一个元素或返回 null(如果超时)结束。

基于事件的方法如下所示:

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
    ReaderEvent result = null;
    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken }))
    {
        cts.CancelAfter(waitFor);

        EventHandler<ReaderEvent> localHandler = (o, e) =>
        {
            if (e.PlaceId == PlaceId)
            {
                result = e;
                cts.Cancel();
            }
        };

        ReaderEventHandler += localHandler;
        try
        {
            await Task.Delay(waitFor, cts.Token).ConfigureAwait(false);
        }
        catch (OperationCanceledException) { }
        catch (Exception ex)
        {
            //...
        }

        ReaderEventHandler -= localHandler;
    }

    return result;
}

如您所见,这个想法是延迟被我正在等待的事件的到来取消,或者 token 源在该特定时间后被配置取消。很干净。

现在,Rx 版本:

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
    ReaderEvent result = null;

    var observable = _OnReaderEvent.FirstAsync(r => r.PlaceId == PlaceId);

    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken }))
    {
        cts.CancelAfter(waitFor);
        using (observable.Subscribe(x => {
            result = x;
            cts.Cancel();
        {
            try
            {
                await Task.Delay(waitFor, cts.Token).ConfigureAwait(false);
            }
            catch (OperationCanceledException) { }
        }
    }
    return result;
}

不太干净……更糟…… 我也尝试过超时扩展。但由于这是一次订阅,我仍然需要等待一段时间才能处理订阅。唯一的区别是 OnError 会取消本地 token ,而不是 CancelAfter 的内置机制。

有没有更好/更简洁(更依赖 Rx)的方法来做到这一点?

谢谢!

最佳答案

你可以试试:

var values = await _OnReaderEvent
  .Where(r => r.PlaceId == placeId)
  .Buffer(waitFor, 1)
  .FirstAsync(); // get list of matching elements during waitFor time

return values.FirstOrDefault(); // return first element or null if the list is empty

关于c# - 接收 : Wait for first item for a period of time,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46524370/

相关文章:

c# - 以编程方式生成属性

c# - C# 中的可区分联合

c# - C# WebBrowser 对象中的 HTML/Javascript 文件本地加载未正确加载?

c# - 线程和隐式内存屏障

system.reactive - Rx - 几个生产者/一个消费者

c# - NHunspell 拼写检查器中的错误

c# - 包装一个使用基于事件的异步模式的库,用于 Async/Await

c# - 没有 TaskCompletionSource 的任务链?

c# - .Net Reactive Extensions Framework (Rx) 是否考虑拓扑顺序?

c# - 如何设置 Observable 来监视属性?