如我最初的问题中所述(参见 Correlate interdependent Event Streams with RX.Net)我有一个 RX.net 事件流,只要未触发某个其他事件(基本上是“处理更改-*事件作为只要系统已连接,断开连接时暂停并在系统重新连接后重新开始处理 Change-* 事件。
但是,虽然这可以顺利处理新事件,但我如何取消/发出信号取消正在进行的 .OnNext() 调用?
最佳答案
由于您的观察者已经被编写为接受 CancellationToken
,我们可以修改您的 Rx 流以提供一个连同事件数据。我们将使用 Rx CancellationDisposable
,只要流被取消订阅,我们就会处理它。
// Converts into a stream that supplies a `CancellationToken` that will be cancelled when the stream is unsubscribed
public static IObservable<Tuple<CancellationToken, T>> CancelOnUnsubscribe<T>(this IObservable<T> source)
{
return Observable.Using(
() => new CancellationDisposable(),
cts => source.Select(item => Tuple.Create(cts.Token, item)));
}
将其与另一个问题的解决方案放在一起:
DataSourceLoaded
.SelectMany(_ => DataSourceFieldChanged
.Throttle(x)
.CancelOnUnsubscribe()
.TakeUntil(DataSourceLoaded))
.Subscribe(c => handler(c.Item1, c.Item2));
当 TakeUntil
子句被触发时,它将取消订阅 CancelOnUnsubscribe
observable,这将依次处理 CancellationDisposable
并导致要取消的 token 。您的观察者可以监视此 token 并在发生这种情况时停止其工作。
关于c# - 取消RX.Net Observer正在进行的OnNext方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26716527/