c# - 多个物理服务器上的单个应用程序中的 Hangfire

标签 c# .net hangfire hangfire-sql

我在单个 Web 应用程序中运行 hangfire,我的应用程序在 2 个物理服务器上运行,但 hangfire 在 1 个数据库中。

目前,我正在为每个队列生成一个服务器,因为每个队列我需要一次运行 1 个工作人员并且他们必须按顺序运行。我是这样设置的

// core
services.AddHangfire(options =>
{
    options.SetDataCompatibilityLevel(CompatibilityLevel.Version_170);
    options.UseSimpleAssemblyNameTypeSerializer();
    options.UseRecommendedSerializerSettings();
    options.UseSqlServerStorage(appSettings.Data.DefaultConnection.ConnectionString, storageOptions);
});

// add multiple servers, this way we get to control how many workers are in each queue
services.AddHangfireServer(options =>
{
    options.ServerName = "workflow-queue";
    options.WorkerCount = 1;
    options.Queues = new string[] { "workflow-queue" };
    options.SchedulePollingInterval = TimeSpan.FromSeconds(10);
});

services.AddHangfireServer(options =>
{
    options.ServerName = "alert-schedule";
    options.WorkerCount = 1;
    options.Queues = new string[] { "alert-schedule" };
    options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
    options.ServerName = string.Format("trigger-schedule");
    options.WorkerCount = 1;
    options.Queues = new string[] { "trigger-schedule" };
    options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
    options.ServerName = "report-schedule";
    options.WorkerCount = 1;
    options.Queues = new string[] { "report-schedule" };
    options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
    options.ServerName = "maintenance";
    options.WorkerCount = 5;
    options.Queues = new string[] { "maintenance" };
    options.SchedulePollingInterval = TimeSpan.FromMinutes(10);
});

我的问题是它在具有不同端口的服务器上生成多个队列。 enter image description here

在我的代码中,如果作业正在排队/重试,我会尝试停止运行,但如果作业正在不同的物理服务器上运行,则找不到并再次排队。

这是检查它是否已经运行的代码

public async Task<bool> IsAlreadyQueuedAsync(PerformContext context)
{
    var disableJob = false;
    var monitoringApi = JobStorage.Current.GetMonitoringApi();

    // get the jobId, method and queue using performContext
    var jobId = context.BackgroundJob.Id;
    var methodInfo = context.BackgroundJob.Job.Method;
    var queueAttribute = (QueueAttribute)Attribute.GetCustomAttribute(context.BackgroundJob.Job.Method, typeof(QueueAttribute));
    
    // enqueuedJobs
    var enqueuedjobStatesToCheck = new[] { "Processing" };
    var enqueuedJobs = monitoringApi.EnqueuedJobs(queueAttribute.Queue, 0, 1000);
    var enqueuedJobsAlready = enqueuedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo) && enqueuedjobStatesToCheck.Contains(e.Value.State));

    if (enqueuedJobsAlready > 0)
        disableJob = true;

    // scheduledJobs
    if (!disableJob)
    {
        // check if there are any scheduledJobs that are processing
        var scheduledJobs = monitoringApi.ScheduledJobs(0, 1000);
        var scheduledJobsAlready = scheduledJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));

        if (scheduledJobsAlready > 0)
            disableJob = true;
    }

    // failedJobs
    if (!disableJob)
    {
        var failedJobs = monitoringApi.FailedJobs(0, 1000);
        var failedJobsAlready = failedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));

        if (failedJobsAlready > 0)
            disableJob = true;
    }

    // if runBefore is true, then lets remove the current job running, else it will write a "successful" message in the logs
    if (disableJob)
    {
        // use hangfire delete, for cleanup
        BackgroundJob.Delete(jobId);

        // create our sqlBuilder to remove the entries altogether including the count
        var sqlBuilder = new SqlBuilder()
            .DELETE_FROM("Hangfire.[Job]")
            .WHERE("[Id] = {0};", jobId);

        sqlBuilder.Append("DELETE TOP(1) FROM Hangfire.[Counter] WHERE [Key] = 'stats:deleted' AND [Value] = 1;");

        using (var cmd = _context.CreateCommand(sqlBuilder))
            await cmd.ExecuteNonQueryAsync();
        
        return true;
    }

    return false;
}

每个方法都有类似下面的属性

public interface IAlertScheduleService
{
    [Hangfire.Queue("alert-schedule")]
    [Hangfire.DisableConcurrentExecution(60 * 60 * 5)]
    Task RunAllAsync(PerformContext context);
}

接口(interface)的简单实现

public class AlertScheduleService : IAlertScheduleService
{
    public Task RunAllAsync(PerformContext context)
    {
        if (IsAlreadyQueuedAsync(context))
            return;

        // guess it isnt queued, so run it here....
    }
}

这是我添加计划任务的方式

//// our recurring jobs
//// set these to run hourly, so they can play "catch-up" if needed
RecurringJob.AddOrUpdate<IAlertScheduleService>(e => e.RunAllAsync(null), Cron.Hourly(0), queue: "alert-schedule");

为什么会这样?我怎样才能阻止它发生?

最佳答案

有点盲目,如果作业已经在同一队列中排队,则阻止作业排队。 try-catch 逻辑非常丑陋,但我现在没有更好的主意...... 另外,真的不确定锁定逻辑总是阻止在 EnqueudState 中有两个作业,但无论如何它应该有所帮助。可能与 IApplyStateFilter 混合使用。

public class DoNotQueueIfAlreadyQueued : IElectStateFilter
{
    public void OnStateElection(ElectStateContext context)
    {
        if (context.CandidateState is EnqueuedState)
        {
            EnqueuedState es = context.CandidateState as EnqueuedState;
            IDisposable distributedLock = null;
            try
            {
                while (distributedLock == null)
                {
                    try
                    {
                        distributedLock = context.Connection.AcquireDistributedLock($"{nameof(DoNotQueueIfAlreadyQueued)}-{es.Queue}", TimeSpan.FromSeconds(1));
                    }
                    catch { }
                }

                var m = context.Storage.GetMonitoringApi();
                if (m.EnqueuedCount(es.Queue) > 0)
                {
                    context.CandidateState = new DeletedState();
                }
            }
            finally
            {
                distributedLock.Dispose();
            }
        }
    }
}

可以声明过滤器as in this answer

关于c# - 多个物理服务器上的单个应用程序中的 Hangfire,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70832860/

相关文章:

c# - 哪个速成版用于 C# 中的 ASP.NET Web 应用程序?

.net - 将 C++ printf 格式与 VB .NET 字符串格式相互转换

c# - Hangfire 作业导致整个应用程序崩溃;我们如何让 Hangfire 来处理错误?

c# - 从 ASP.Net MVC5 调用的单独类库 .dll 中的 Hangfire

c# - TagBuilder 不跨 ajax 请求生成唯一的 id 属性值

c# - 用户 'IIS APPPOOL\ASP.NET v4.0' 登录失败

c# - 如何防止property setter修改私有(private)属性数据

c# - 在 Parallel.ForEach 中使用 KeyValuePair<String, String>

c# - 如何使用 Moles 作为构造函数?

c# - Hangfire 在作业中使用变量