c# - 创建带有心跳的任务

标签 c# .net task-parallel-library async-await azureservicebus

我想运行一个 Task它有一个“heartbeat”,它会在特定的时间间隔内持续运行,直到任务完成。

我认为像这样的扩展方法会很好用:

public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken)

例如:

public class Program {
    public static void Main() {
        var cancelTokenSource = new CancellationTokenSource();
        var cancelToken = cancelTokenSource.Token;
        var longRunningTask = Task.Factory.StartNew(SomeLongRunningTask, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
        var withHeartbeatTask = longRunningTask.WithHeartbeat(TimeSpan.FromSeconds(1), PerformHeartbeat, cancelToken);
        withHeartbeatTask.Wait();
        Console.WriteLine("Long running task completed!");
        Console.ReadLine()
    }

    private static void SomeLongRunningTask() {
        Console.WriteLine("Starting long task");
        Thread.Sleep(TimeSpan.FromSeconds(9.5));
    }
    private static int _heartbeatCount = 0;
    private static void PerformHeartbeat(CancellationToken cancellationToken) {
        Console.WriteLine("Heartbeat {0}", ++_heartbeatCount);
    }
}

这个程序应该输出:

Starting long task
Heartbeat 1
Heartbeat 2
Heartbeat 3
Heartbeat 4
Heartbeat 5
Heartbeat 6
Heartbeat 7
Heartbeat 8
Heartbeat 9
Long running task completed!

请注意,它不应(在正常情况下)输出“心跳 10”,因为心跳在初始超时(即 1 秒)后开始。同样,如果任务花费的时间少于心跳间隔,则根本不应发生心跳。

什么是实现它的好方法?

背景信息:我有一个正在监听 Azure Service Bus 的服务队列。我不想Complete消息(这会将其从队列中永久删除)直到我完成处理它,这可能需要比最大消息更长的时间 LockDuration 5分钟。因此,我需要使用这种心跳方法来调用 RenewLockAsync在锁定持续时间到期之前,以便消息在进行冗长处理时不会超时。

最佳答案

这是我的尝试:

public static class TaskExtensions {
    /// <summary>
    /// Issues the <paramref name="heartbeatAction"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running.
    /// </summary>
    public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) {
        if (cancellationToken.IsCancellationRequested) {
            return;
        }

        var stopHeartbeatSource = new CancellationTokenSource();
        cancellationToken.Register(stopHeartbeatSource.Cancel);

        await Task.WhenAny(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatAction, stopHeartbeatSource.Token));
        stopHeartbeatSource.Cancel();
    }
        
    private static async Task PerformHeartbeats(TimeSpan interval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) {
        while (!cancellationToken.IsCancellationRequested) {
            try {
                await Task.Delay(interval, cancellationToken);
                if (!cancellationToken.IsCancellationRequested) {
                    heartbeatAction(cancellationToken);
                }
            }
            catch (TaskCanceledException tce) {
                if (tce.CancellationToken == cancellationToken) {
                    // Totally expected
                    break;
                }
                throw;
            }
        }
    }
}

或者稍微调整一下,您甚至可以使心跳异步,如下所示:

    /// <summary>
    /// Awaits a fresh Task created by the <paramref name="heartbeatTaskFactory"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running.
    /// </summary>
    public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) {
        if (cancellationToken.IsCancellationRequested) {
            return;
        }

        var stopHeartbeatSource = new CancellationTokenSource();
        cancellationToken.Register(stopHeartbeatSource.Cancel);

        await Task.WhenAll(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatTaskFactory, stopHeartbeatSource.Token));

        if (!stopHeartbeatSource.IsCancellationRequested) {
            stopHeartbeatSource.Cancel();
        }
    }

    public static Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory) {
        return WithHeartbeat(primaryTask, heartbeatInterval, heartbeatTaskFactory, CancellationToken.None);
    }

    private static async Task PerformHeartbeats(TimeSpan interval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) {
        while (!cancellationToken.IsCancellationRequested) {
            try {
                await Task.Delay(interval, cancellationToken);
                if (!cancellationToken.IsCancellationRequested) {
                    await heartbeatTaskFactory(cancellationToken);
                }
            }
            catch (TaskCanceledException tce) {
                if (tce.CancellationToken == cancellationToken) {
                    // Totally expected
                    break;
                }
                throw;
            }
        }
    }

这将允许您将示例代码更改为如下内容:

private static async Task PerformHeartbeat(CancellationToken cancellationToken) {
    Console.WriteLine("Starting heartbeat {0}", ++_heartbeatCount);
    await Task.Delay(1000, cancellationToken);
    Console.WriteLine("Finishing heartbeat {0}", _heartbeatCount);
}

PerformHeartbeat 可以用类似 RenewLockAsync 的异步调用代替这样您就不必使用像 RenewLock 这样的阻塞调用来浪费线程时间行动方法需要。

我是 answering my own question per SO guidelines ,但我也愿意接受更优雅的方法来解决这个问题。

关于c# - 创建带有心跳的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17115385/

相关文章:

.net - .net vNext 到底是什么。 .net core vs vNext 差异/解释

.net - PLINQ 是否尊重 SynchronizationContext?

c# - 使用 Control.Invoke 的死锁?

c# - 通过使用任务加快处理改进

c# - 绑定(bind)到列表的计数,其中 Typeof

c# - 使用 BitmapEncoder 生成时如何使 GIF 循环重复

c# - 如何在asp.net web api中检测操作系统、浏览器、设备名称等设备信息?

c# - 为什么有两个目录分隔符?

c# - ASP.NET 成员身份提供程序,配置不正确 - 无法打开网站管理工具

c# - 如何使用 TaskCompletionSource.SetException 保留等待行为?