c# - 如果在下一个事件到达之前未完成,如何取消 RX 中的选择

标签 c# system.reactive

我有以下设置

IObservable<Data> source = ...;

source
    .Select(data=>VeryExpensiveOperation(data))
    .Subscribe(data=>Console.WriteLine(data));

通常情况下,事件会在合理的时间范围内分开。 想象一下用户更新表单中的文本框。我们的 VeryExpensiveOperation 可能需要 5 秒才能完成,同时需要一个沙漏 显示在屏幕上。

但是如果在 5 秒内用户再次更新文本框 我想向当前的 VeryExpensiveOperation 发送取消 在新的开始之前。

我会想象这样的场景

source
    .SelectWithCancel((data, cancelToken)=>VeryExpensiveOperation(data, token))
    .Subscribe(data=>Console.WriteLine(data));

因此每次调用 lambda 时都会使用一个 cancelToken 来调用,它可以是 用于管理取消 Task。但是现在我们正在混合使用 Task、CancellationToken 和 RX。 不太确定如何将它们组合在一起。任何建议。

Bonus Points 了解如何使用 XUnit 测试运算符 :)

第一次尝试

    public static IObservable<U> SelectWithCancelation<T, U>( this IObservable<T> This, Func<CancellationToken, T, Task<U>> fn )
    {
        CancellationTokenSource tokenSource = new CancellationTokenSource();

        return This
            .ObserveOn(Scheduler.Default)
            .Select(v=>{
                tokenSource.Cancel();
                tokenSource=new CancellationTokenSource();
                return new {tokenSource.Token, v};
            })
            .SelectMany(o=>Observable.FromAsync(()=>fn(o.Token, o.v)));
    }

尚未测试。我希望未完成的任务会生成一个 IObservable,该 IObservable 会在不触发任何 OnNext 事件的情况下完成。

最佳答案

您必须将 VeryExpensiveOperation 建模为可取消的异步事物。 TaskIObservable。我假设这是一个带有 CancellationToken 的任务:

Task<TResult> VeryExpensiveOperationAsync<TSource, TResult>(TSource item, CancellationToken token);

然后你这样做:

source
    .Select(item => Observable.DeferAsync(async token =>
    {
        // do not yield the observable until after the operation is completed
        // (ie do not just do VeryExpensiveOperation(...).ToObservable())
        // because DeferAsync() will dispose of the token source as soon
        // as you provide the observable (instead of when the observable completes)
        var result = await VeryExpensiveOperationAsync(item, token);
        return Observable.Return(result);
    })
    .Switch();

Select 只是创建了一个延迟的可观察对象,当它被订阅时,将创建一个 token 并启动操作。如果在操作完成之前取消订阅可观察对象,则 token 将被取消。

Switch 订阅来自 Select 的每个新可观察值,取消订阅它订阅的前一个可观察值。

这样就有了你想要的效果。

附言这很容易测试。只需提供一个模拟源和一个使用单元测试提供的 TaskCompletetionSource 的模拟 VeryExpensiveOperation,这样单元测试就可以准确控制何时生成新源项以及何时完成任务.像这样:

void SomeTest()
{
    // create a test source where the values are how long
    // the mock operation should wait to do its work.
    var source = _testScheduler.CreateColdObservable<int>(...);

    // records the actions (whether they completed or canceled)
    List<bool> mockActionsCompleted = new List<bool>();
    var resultStream = source.SelectWithCancellation((token, delay) =>
    {
        var tcs = new TaskCompletionSource<string>();
        var tokenRegistration = new SingleAssignmentDisposable();

        // schedule an action to complete the task
        var d = _testScheduler.ScheduleRelative(delay, () =>
        {
           mockActionsCompleted.Add(true);
           tcs.SetResult("done " + delay);
           // stop listening to the token
           tokenRegistration.Dispose();
        });

        // listen to the token and cancel the task if the token signals
        tokenRegistration.Disposable = token.Register(() =>
        {
           mockActionsCompleted.Add(false);
           tcs.TrySetCancelled();
           // cancel the scheduled task
           d.Dispose();
        });

        return tcs.Task;
    });

    // subscribe to resultStream
    // start the scheduler
    // assert the mockActionsCompleted has the correct sequence
    // assert the results observed were what you expected.
}

由于动态安排的新操作,您可能会在使用 testScheduler.Start() 时遇到麻烦。使用 testScheduler.AdvanceBy(1) 的 while 循环可能会更好。

关于c# - 如果在下一个事件到达之前未完成,如何取消 RX 中的选择,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17836743/

相关文章:

c# - 我什么时候启动和处置我的用户控件?

c# - RX : How to bind an IObservable<object> to a property (ReactiveUI)

c# - 我怎样才能使这个 Observable 更可重用?

c# - ASP.NET 中的日历控件

c# - Linq 其中值在数组中

C# 通用运算符 - RTTI 方法

xamarin.ios - Reactive Extensions 2.1 PCL 与 Xamarin 兼容吗?

.net - 使用 RX Throttle 时的跨线程异常

c# - 在应用程序退出时处理 RX 线程

c# - HttpClient PostAsync 不返回