c# - 取消RX.Net Observer正在进行的OnNext方法

标签 c# events system.reactive reactive-programming cancellation

如我最初的问题中所述(参见 Correlate interdependent Event Streams with RX.Net)我有一个 RX.net 事件流,只要未触发某个其他事件(基本上是“处理更改-*事件作为只要系统已连接,断开连接时暂停并在系统重新连接后重新开始处理 Change-* 事件。

但是,虽然这可以顺利处理新事件,但我如何取消/发出信号取消正在进行的 .OnNext() 调用?

最佳答案

由于您的观察者已经被编写为接受 CancellationToken,我们可以修改您的 Rx 流以提供一个连同事件数据。我们将使用 Rx CancellationDisposable,只要流被取消订阅,我们就会处理它。

// Converts into a stream that supplies a `CancellationToken` that will be cancelled when the stream is unsubscribed
public static IObservable<Tuple<CancellationToken, T>> CancelOnUnsubscribe<T>(this IObservable<T> source)
{
    return Observable.Using(
        () => new CancellationDisposable(),
        cts => source.Select(item => Tuple.Create(cts.Token, item)));
}

将其与另一个问题的解决方案放在一起:

DataSourceLoaded
    .SelectMany(_ => DataSourceFieldChanged
        .Throttle(x)
        .CancelOnUnsubscribe()
        .TakeUntil(DataSourceLoaded))
    .Subscribe(c => handler(c.Item1, c.Item2));

TakeUntil 子句被触发时,它将取消订阅 CancelOnUnsubscribe observable,这将依次处理 CancellationDisposable 并导致要取消的 token 。您的观察者可以监视此 token 并在发生这种情况时停止其工作。

关于c# - 取消RX.Net Observer正在进行的OnNext方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26716527/

相关文章:

c# - 为什么 C# EventHandler 等于 MSN 示例中的事件?

c# - 使用C#在同一个进程中执行多个命令行

c# - LINQ:分组后从实体集合中选择最小值和最大值

c# - 使用 nunit 测试事件

c# - 如何检查事件是否已触发?

c# - 结合依赖的 IObservables

java - 读取文件添加到文件夹的时间

c# - 触发器返回一个结果集和/或在 SET NOCOUNT OFF 的情况下运行,而另一个未完成的结果集处于事件状态

c# - 将 LINQ 与异步混合(从种子获取有效负载)

system.reactive - 如何在 Observable.Do 扩展方法中传递异步方法?