我有以下设置
IObservable<Data> source = ...;
source
.Select(data=>VeryExpensiveOperation(data))
.Subscribe(data=>Console.WriteLine(data));
通常情况下,事件会在合理的时间范围内分开。
想象一下用户更新表单中的文本框。我们的 VeryExpensiveOperation
可能需要 5 秒才能完成,同时需要一个沙漏
显示在屏幕上。
但是如果在 5 秒内用户再次更新文本框
我想向当前的 VeryExpensiveOperation
发送取消
在新的开始之前。
我会想象这样的场景
source
.SelectWithCancel((data, cancelToken)=>VeryExpensiveOperation(data, token))
.Subscribe(data=>Console.WriteLine(data));
因此每次调用 lambda 时都会使用一个 cancelToken 来调用,它可以是
用于管理取消 Task
。但是现在我们正在混合使用 Task、CancellationToken 和 RX。
不太确定如何将它们组合在一起。任何建议。
Bonus Points 了解如何使用 XUnit 测试运算符 :)
第一次尝试
public static IObservable<U> SelectWithCancelation<T, U>( this IObservable<T> This, Func<CancellationToken, T, Task<U>> fn )
{
CancellationTokenSource tokenSource = new CancellationTokenSource();
return This
.ObserveOn(Scheduler.Default)
.Select(v=>{
tokenSource.Cancel();
tokenSource=new CancellationTokenSource();
return new {tokenSource.Token, v};
})
.SelectMany(o=>Observable.FromAsync(()=>fn(o.Token, o.v)));
}
尚未测试。我希望未完成的任务会生成一个 IObservable,该 IObservable 会在不触发任何 OnNext
事件的情况下完成。
最佳答案
您必须将 VeryExpensiveOperation
建模为可取消的异步事物。 Task
或 IObservable
。我假设这是一个带有 CancellationToken
的任务:
Task<TResult> VeryExpensiveOperationAsync<TSource, TResult>(TSource item, CancellationToken token);
然后你这样做:
source
.Select(item => Observable.DeferAsync(async token =>
{
// do not yield the observable until after the operation is completed
// (ie do not just do VeryExpensiveOperation(...).ToObservable())
// because DeferAsync() will dispose of the token source as soon
// as you provide the observable (instead of when the observable completes)
var result = await VeryExpensiveOperationAsync(item, token);
return Observable.Return(result);
})
.Switch();
Select
只是创建了一个延迟的可观察对象,当它被订阅时,将创建一个 token 并启动操作。如果在操作完成之前取消订阅可观察对象,则 token 将被取消。
Switch
订阅来自 Select
的每个新可观察值,取消订阅它订阅的前一个可观察值。
这样就有了你想要的效果。
附言这很容易测试。只需提供一个模拟源和一个使用单元测试提供的 TaskCompletetionSource
的模拟 VeryExpensiveOperation
,这样单元测试就可以准确控制何时生成新源项以及何时完成任务.像这样:
void SomeTest()
{
// create a test source where the values are how long
// the mock operation should wait to do its work.
var source = _testScheduler.CreateColdObservable<int>(...);
// records the actions (whether they completed or canceled)
List<bool> mockActionsCompleted = new List<bool>();
var resultStream = source.SelectWithCancellation((token, delay) =>
{
var tcs = new TaskCompletionSource<string>();
var tokenRegistration = new SingleAssignmentDisposable();
// schedule an action to complete the task
var d = _testScheduler.ScheduleRelative(delay, () =>
{
mockActionsCompleted.Add(true);
tcs.SetResult("done " + delay);
// stop listening to the token
tokenRegistration.Dispose();
});
// listen to the token and cancel the task if the token signals
tokenRegistration.Disposable = token.Register(() =>
{
mockActionsCompleted.Add(false);
tcs.TrySetCancelled();
// cancel the scheduled task
d.Dispose();
});
return tcs.Task;
});
// subscribe to resultStream
// start the scheduler
// assert the mockActionsCompleted has the correct sequence
// assert the results observed were what you expected.
}
由于动态安排的新操作,您可能会在使用 testScheduler.Start()
时遇到麻烦。使用 testScheduler.AdvanceBy(1)
的 while 循环可能会更好。
关于c# - 如果在下一个事件到达之前未完成,如何取消 RX 中的选择,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17836743/