c# - 如何创建一个 Observable Timer,它调用一个方法并在该方法运行到完成时阻止取消?

标签 c# system.reactive

我的要求:

  1. 在指定的时间间隔运行方法 DoWork。
  2. 如果在对 DoWork 的调用之间调用了停止,则只需停止计时器。
  3. 如果在 DoWork 运行时调用停止,则阻塞直到 DoWork 完成。
  4. 如果调用 stop 后 DoWork 完成时间过长,则超时。

我有一个到目前为止似乎有效的解决方案,但我对它不是很满意,并且认为我可能遗漏了一些东西。以下是我的测试应用程序中的 void Main:

var source = new CancellationTokenSource();

// Create an observable sequence for the Cancel event.
var cancelObservable = Observable.Create<Int64>(o =>
{
    source.Token.Register(() =>
    {
        Console.WriteLine("Start on canceled handler.");
        o.OnNext(1);
        Console.WriteLine("End on canceled handler.");
    });

    return Disposable.Empty;
});

var observable =
    // Create observable timer.
    Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), Scheduler.Default)
        // Merge with the cancel observable so we have a composite that 
        // generates an event every 10 seconds AND immediately when a cancel is requested.
        .Merge(cancelObservable)
        // This is what I ended up doing instead of disposing the timer so that I could wait
        // for the sequence to finish, including DoWork.
        .TakeWhile(i => !source.IsCancellationRequested)
        // I could put this in an observer, but this way exceptions could be caught and handled
        // or the results of the work could be fed to a subscriber.
        .Do(l =>
        {
            Console.WriteLine("Start DoWork.");
            Thread.Sleep(TimeSpan.FromSeconds(5));
            Console.WriteLine("Finish DoWork.");
        });

var published = observable.Publish();

var disposable = published.Connect();

// Press key between Start DoWork and Finish DoWork to test the cancellation while
// running DoWork.
// Press key between Finish DoWork and Start DoWork to test cancellation between
// events.
Console.ReadKey();

// I doubt this is good practice, but I was finding that o.OnNext was blocking
// inside of register, and the timeout wouldn't work if I blocked here before
// I set it up.
Task.Factory.StartNew(source.Cancel);

// Is there a preferred way to block until a sequence is finished? My experience
// is there's a timing issue if Cancel finishes fast enough the sequence may already
// be finished by the time I get here and .Wait() complains that the sequence contains
// no elements.
published.Timeout(TimeSpan.FromSeconds(1))
    .ForEach(i => { });

disposable.Dispose();

Console.WriteLine("All finished! Press any key to continue.");
Console.ReadKey();

最佳答案

首先,在您的cancelObservable 中,确保将Token.Register 的结果返回为您的disposable 而不是返回Disposable.Empty

这里有一个很好的扩展方法,可以将 CancellationTokens 转换为可观察对象:

public static IObservable<Unit> AsObservable(this CancellationToken token, IScheduler scheduler)
{
    return Observable.Create<Unit>(observer =>
    {
        var d1 = new SingleAssignmentDisposable();
        return new CompositeDisposable(d1, token.Register(() =>
            {
                d1.Disposable = scheduler.Schedule(() =>
                {
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                });
            }));
    });
}

现在,根据您的实际要求:

public IObservable<Unit> ScheduleWork(IObservable<Unit> cancelSignal)
{
    // Performs work on an interval
    // stops the timer (but finishes any work in progress) when the cancelSignal is received
    var workTimer = Observable
        .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10))
        .TakeUntil(cancelSignal)
        .Select(_ =>
        {
            DoWork();
            return Unit.Default;
        })
        .IgnoreElements();

    // starts a timer after cancellation that will eventually throw a timeout exception.
    var timeoutAfterCancelSignal = cancelSignal
        .SelectMany(c => Observable.Never<Unit>().Timeout(TimeSpan.FromSeconds(5)));

    // Use Amb to listen to both the workTimer
    // and the timeoutAfterCancelSignal
    // Since neither produce any data we are really just
    // listening to see which will complete first.
    // if the workTimer completes before the timeout
    // then Amb will complete without error.
    // However if the timeout expires first, then Amb
    // will produce an error
    return Observable.Amb(workTimer, timeoutAfterCancelSignal);
}

// Usage
var cts = new CancellationTokenSource();
var s = ScheduleWork(cts.Token.AsObservable(Scheduler.Default));

using (var finishedSignal = new ManualResetSlim())
{
    s.Finally(finishedSignal.Set).Subscribe(
        _ => { /* will never be called */},
        error => { /* handle error */ },
        () => { /* canceled without error */ } );

    Console.ReadKey();
    cts.Cancel();

    finishedSignal.Wait();
}

请注意,除了取消标记,您还可以:

var cancelSignal = new AsyncSubject<Unit>();
var s = ScheduleWork(cancelSignal);

// .. to cancel ..
Console.ReadKey();
cancelSignal.OnNext(Unit.Default);
cancelSignal.OnCompleted();

关于c# - 如何创建一个 Observable Timer,它调用一个方法并在该方法运行到完成时阻止取消?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20714219/

相关文章:

c# - 对具有动态操作数的空合并运算符进行类型检查

c# - Windows 桌面应用程序也可以在 Windows Phone 上运行吗?

c# - MVVM - WPF 如何将我的 View 绑定(bind)到我的 View 模型?

c# - react 性扩展 : buffer until subscriber is idle

c# - Roland Pheasant 使用 DynamicData 实现尾部方法

c# - 无论 TimeSpan 或计数如何,Reactive Extensions Buffer 每次都会执行

c# - 无法访问简单文件 "because it is being used by another process"

c# - 从类库注册Web API Controller

c# - .Net 5.0 中对 WPF 的响应式扩展支持

c# - 异步一次性创建