azure - Azure 应用服务和 .NET Core 3.1 中长时间运行计算的合适解决方案?

标签 azure asp.net-core long-running-processes azure-appservice

在不需要数据库且无需对此应用程序之外的任何内容进行 IO 的应用程序中,在 Azure 应用服务和 .NET Core 3.1 中长时间运行计算的合适解决方案是什么?这是一个计算任务。
具体来说,以下是不可靠的,需要一个解决方案。

[Route("service")]
[HttpPost]
public Outbound Post(Inbound inbound)
{
    Debug.Assert(inbound.Message.Equals("Hello server."));
    Outbound outbound = new Outbound();
    long Billion = 1000000000;
    for (long i = 0; i < 33 * Billion; i++) // 230 seconds
        ;
    outbound.Message = String.Format("The server processed inbound object.");
    return outbound;
}
这有时会向 HttpClient 返回一个空对象。 (未显示)。较小的工作量总是会成功。例如,30 亿次迭代总是成功的。一个更大的数字会很好,特别是 2400 亿是一个要求。
我认为在 2020 年,带有 .NET Core 的 Azure App Service 的一个合理目标可能是在 8 个子线程的帮助下将父线程数提高到 2400 亿,因此每个子线程数达到 300 亿,而父线程划分 8 M 字节入站对象转换为入站到每个子项的较小对象。每个子进程收到一个 1 M 字节的入站数据,并将 1 M 字节的出站数据返回给父级。父节点将结果重新组装成 8 M 字节的出站。
显然,耗时将是单线程实现所需时间的 12.5%,或 1/8,或八分之一。与计算时间相比,切割和重新组装对象的时间很小。我假设传输对象的时间与计算时间相比非常小,因此 12.5% 的期望值大致准确。
如果我能得到 4 或 8 个内核,那就太好了。如果我能得到线程,让我说一个核心周期的 50%,那么我可能需要 8 或 16 个线程。如果每个线程给我 33% 的内核周期,那么我需要 12 或 24 个线程。
我正在考虑 BackgroundService类,但我正在寻找确认这是正确的方法。微软说...
BackgroundService is a base class for implementing a long running IHostedService.
显然,如果某项长时间运行,最好通过 System.Threading 使用多核使其更快完成。但是这个documentation好像提到System.Threading仅在通过 System.Threading.Timer 启动任务的上下文中.我的示例代码显示我的应用程序中不需要计时器。 HTTP POST 将作为工作的机会。通常我会使用 System.Threading.Thread实例化多个对象以使用多个内核。我发现在需要很长时间的工作解决方案的上下文中,没有提及多核是一个明显的遗漏,但可能有某种原因 Azure 应用服务不处理这个问题。也许我只是无法在教程和文档中找到它。
任务的启动是图示的 HTTP POST Controller 。假设最长的作业需要 10 分钟。 HTTP 客户端(未显示)将超时限制设置为 1000 秒,这远远超过 10 分钟(600 秒),以便有安全边际。 HttpClient.Timeout是相关属性。目前我认为 HTTP 超时是一个真正的限制;而不是某种非约束性(假限制),这样一些其他约束会导致用户等待 9 分钟并收到错误消息。一个真正的绑定(bind)限制是一个我可以说“但对于这个超时它会成功”的限制。如果 HTTP 超时不是真正的绑定(bind)限制,并且还有其他限制系统的东西,我可以调整我的 HTTP Controller ,改为使用三 (3) 个 POST 方法。因此 POST1 意味着使用入站对象启动任务。 POST2 的意思是告诉我它是否完成了。 POST3 意味着给我出站对象。
在不需要数据库且无需对此应用程序之外的任何内容进行 IO 的应用程序中,在 Azure 应用服务和 .NET Core 3.1 中长时间运行计算的合适解决方案是什么?这是一个计算任务。

最佳答案

序幕
几年前遇到了一个非常相似的问题。我们需要一种可以处理大量数据的服务。有时处理需要 10 秒,有时可能需要一个小时。
起初,我们按照您的问题说明了这一点:向服务发送请求,服务处理请求中的数据并在完成后返回响应。
手头的问题
当作业只需要大约一分钟或更短的时间时,这很好,但是超过此时间,服务器将关闭 session 并且调用者将报告错误。
服务器在放弃请求之前有大约 2 分钟的默认时间来产生响应。它不会退出请求的处理......但它会退出 HTTP session 。您在 HttpClient 上设置什么参数并不重要,服务器是委派多长时间太长的那个。
问题原因
这一切都是有充分理由的。服务器套接字非常昂贵。你的数量有限。服务器试图通过切断时间超过指定时间的请求来保护您的服务,以避免套接字饥饿问题。
通常,您希望 HTTP 请求只需要几毫秒。如果它们花费的时间比这更长,并且您的服务必须以高速率满足其他请求,您最终会遇到套接字问题。
解决方案
我们决定走IHostedService的路线,特别是 BackgroundService .我们将此服务与队列结合使用。通过这种方式,您可以设置作业队列和 BackgroundService将一次处理一个(在某些情况下,我们有一次处理多个队列项目的服务,在其他情况下,我们水平扩展产生两个或更多队列)。
为什么 ASP.NET Core 服务运行 BackgroundService ?我想在不与任何特定于 Azure 的构造紧密耦合的情况下处理这个问题,以防我们需要从 Azure 转移到其他一些云服务(回到那天我们出于其他原因考虑这样做。)
这对我们来说效果很好,从那以后我们没有看到任何问题。
工作流程是这样的:

  • 调用者向服务发送请求,并带有一些参数
  • 服务生成“作业”对象并立即通过 202(已接受)响应返回 ID
  • 服务将此作业放入由 BackgroundService 维护的队列中。
  • 调用者可以使用此作业 ID
  • 查询作业状态并获取有关已完成多少以及剩余多少的信息。
  • 服务完成作业,将作业置于“已完成”状态并返回等待队列以产生更多作业

  • 请记住,您的服务能够在运行多个实例的情况下进行水平扩展。在这种情况下,我使用 Redis 缓存来存储作业的状态,以便所有实例共享相同的状态。
    如果您没有可用的 Redis 缓存,我还添加了“内存缓存”选项以在本地测试内容。您可以在服务器上运行“Memory Cache”服务,只要知道如果它扩展,那么您的数据就会不一致。
    例子
    由于我已婚并有 child ,所以在每个人都上床 sleep 后的周五晚上我真的不会做太多事情,所以我花了一些时间整理了一个示例,您可以尝试一下。全solution也可供您试用。
    QueuedBackgroundService.cs
    此类实现有两个特定目的:一个是从队列中读取(BackgroundService 实现),另一个是写入队列(IQueuedBackgroundService 实现)。
        public interface IQueuedBackgroundService
        {
            Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters);
        }
    
        public sealed class QueuedBackgroundService : BackgroundService, IQueuedBackgroundService
        {
            private sealed class JobQueueItem
            {
                public string JobId { get; set; }
                public JobParametersModel JobParameters { get; set; }
            }
    
            private readonly IComputationWorkService _workService;
            private readonly IComputationJobStatusService _jobStatusService;
    
            // Shared between BackgroundService and IQueuedBackgroundService.
            // The queueing mechanism could be moved out to a singleton service. I am doing
            // it this way for simplicity's sake.
            private static readonly ConcurrentQueue<JobQueueItem> _queue =
                new ConcurrentQueue<JobQueueItem>();
            private static readonly SemaphoreSlim _signal = new SemaphoreSlim(0);
    
            public QueuedBackgroundService(IComputationWorkService workService,
                IComputationJobStatusService jobStatusService)
            {
                _workService = workService;
                _jobStatusService = jobStatusService;
            }
    
            /// <summary>
            /// Transient method via IQueuedBackgroundService
            /// </summary>
            public async Task<JobCreatedModel> PostWorkItemAsync(JobParametersModel jobParameters)
            {
                var jobId = await _jobStatusService.CreateJobAsync(jobParameters).ConfigureAwait(false);
                _queue.Enqueue(new JobQueueItem { JobId = jobId, JobParameters = jobParameters });
                _signal.Release(); // signal for background service to start working on the job
                return new JobCreatedModel { JobId = jobId, QueuePosition = _queue.Count };
            }
    
            /// <summary>
            /// Long running task via BackgroundService
            /// </summary>
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                while(!stoppingToken.IsCancellationRequested)
                {
                    JobQueueItem jobQueueItem = null;
                    try
                    {
                        // wait for the queue to signal there is something that needs to be done
                        await _signal.WaitAsync(stoppingToken).ConfigureAwait(false);
    
                        // dequeue the item
                        jobQueueItem = _queue.TryDequeue(out var workItem) ? workItem : null;
    
                        if(jobQueueItem != null)
                        {
                            // put the job in to a "processing" state
                            await _jobStatusService.UpdateJobStatusAsync(
                                jobQueueItem.JobId, JobStatus.Processing).ConfigureAwait(false);
    
                            // the heavy lifting is done here...
                            var result = await _workService.DoWorkAsync(
                                jobQueueItem.JobId, jobQueueItem.JobParameters,
                                stoppingToken).ConfigureAwait(false);
    
                            // store the result of the work and set the status to "finished"
                            await _jobStatusService.StoreJobResultAsync(
                                jobQueueItem.JobId, result, JobStatus.Success).ConfigureAwait(false);
                        }
                    }
                    catch(TaskCanceledException)
                    {
                        break;
                    }
                    catch(Exception ex)
                    {
                        try
                        {
                            // something went wrong. Put the job in to an errored state and continue on
                            await _jobStatusService.StoreJobResultAsync(jobQueueItem.JobId, new JobResultModel
                            {
                                Exception = new JobExceptionModel(ex)
                            }, JobStatus.Errored).ConfigureAwait(false);
                        }
                        catch(Exception)
                        {
                            // TODO: log this
                        }
                    }
                }
            }
        }
    
    它是这样注入(inject)的:
        services.AddHostedService<QueuedBackgroundService>();
        services.AddTransient<IQueuedBackgroundService, QueuedBackgroundService>();
    
    ComputationController.cs
    用于读/写作业的 Controller 如下所示:
        [ApiController, Route("api/[controller]")]
        public class ComputationController : ControllerBase
        {
            private readonly IQueuedBackgroundService _queuedBackgroundService;
            private readonly IComputationJobStatusService _computationJobStatusService;
    
            public ComputationController(
                IQueuedBackgroundService queuedBackgroundService,
                IComputationJobStatusService computationJobStatusService)
            {
                _queuedBackgroundService = queuedBackgroundService;
                _computationJobStatusService = computationJobStatusService;
            }
    
            [HttpPost, Route("beginComputation")]
            [ProducesResponseType(StatusCodes.Status202Accepted, Type = typeof(JobCreatedModel))]
            public async Task<IActionResult> BeginComputation([FromBody] JobParametersModel obj)
            {
                return Accepted(
                    await _queuedBackgroundService.PostWorkItemAsync(obj).ConfigureAwait(false));
            }
    
            [HttpGet, Route("computationStatus/{jobId}")]
            [ProducesResponseType(StatusCodes.Status200OK, Type = typeof(JobModel))]
            [ProducesResponseType(StatusCodes.Status404NotFound, Type = typeof(string))]
            public async Task<IActionResult> GetComputationResultAsync(string jobId)
            {
                var job = await _computationJobStatusService.GetJobAsync(jobId).ConfigureAwait(false);
                if(job != null)
                {
                    return Ok(job);
                }
                return NotFound($"Job with ID `{jobId}` not found");
            }
    
            [HttpGet, Route("getAllJobs")]
            [ProducesResponseType(StatusCodes.Status200OK,
                Type = typeof(IReadOnlyDictionary<string, JobModel>))]
            public async Task<IActionResult> GetAllJobsAsync()
            {
                return Ok(await _computationJobStatusService.GetAllJobsAsync().ConfigureAwait(false));
            }
    
            [HttpDelete, Route("clearAllJobs")]
            [ProducesResponseType(StatusCodes.Status200OK)]
            [ProducesResponseType(StatusCodes.Status401Unauthorized)]
            public async Task<IActionResult> ClearAllJobsAsync([FromQuery] string permission)
            {
                if(permission == "this is flakey security so this can be run as a public demo")
                {
                    await _computationJobStatusService.ClearAllJobsAsync().ConfigureAwait(false);
                    return Ok();
                }
                return Unauthorized();
            }
        }
    
    工作示例
    只要这个问题是活跃的,我就会maintain a working example你可以试试。对于此特定示例,您可以指定要运行的迭代次数。为了模拟长时间运行的工作,每次迭代为 1 秒。因此,如果您将迭代值设置为 60,它将运行该作业 60 秒。
    当它运行时,运行 computationStatus/{jobId}getAllJobs端点。您可以实时查看所有工作更新。
    这个例子远不是一个功能齐全的、涵盖所有边缘情况的、成熟的、可以投入生产的例子,但它是一个好的开始。
    结论
    在后端工作了几年后,我看到很多问题都是由于不了解后端的所有“规则”而产生的。希望这个答案能够对我过去遇到的问题有所了解,并希望这可以使您不必处理上述问题。

    关于azure - Azure 应用服务和 .NET Core 3.1 中长时间运行计算的合适解决方案?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63369558/

    相关文章:

    Azure MSAL JS : How to edit profile?

    entity-framework - 仅在 Azure Web 应用程序中出现单个查询的 SQL 连接超时错误

    azure - 将流写入 blob 时租用 blob,直至完成

    c# - 在Asp.Net Core 2.0中使用AllowHtml从数据库生成网页

    c# - 健康检查 MassTransit 和 Rabbitmq

    c# - ASP.NET CORE (.NET Framework) 和本地化

    scala - 使用 Kafka 在长时间运行的 Spark 作业之间进行通信

    java - 通知用户文件正在下载 servlet 中生成

    msbuild - 从命令行发布 Azure 项目

    c# - 长时间运行的线程或任务