azure - 如何确保队列消息已在 Azure Functions 中成功处理?

标签 azure azure-functions azure-webjobs azure-queues azure-http-trigger

我有一个使用 HTTP 触发器和队列触发器构建的 C# Azure Functions(在应用服务计划中)应用程序。该应用程序的工作原理是在客户端计算机上安装一个脚本,该脚本使用 SQL 查询从客户端数据库中提取各种文件,并将输出移动到临时 Azure Blob 存储。每个文件完成后,将调用 HTTP 触发器,为队列触发器创建一条队列消息,以获取该消息并将文件从临时 Blob 存储移动到 Blob 存储中的永久位置。 HTTP 触发器完成并将消息放入队列后,执行返回到客户端脚本以开始处理下一个 SQL 查询。

我担心的是,当队列触发器实际上仍在工作或可能失败时,特别是在并行处理多个客户端时,这些队列消息将堆积起来,并且客户端脚本将完成并显示错误的成功消息。有没有办法确保在继续下一个 SQL 查询之前队列消息已成功处理?

编辑:添加代码示例

我可能有 3 个客户端,其计算机上安装了应用程序,每个客户端都设置为在上午 12 点执行这些脚本,并且可以同时运行,因为它们托管在客户端计算机上。 客户端脚本

// perform SQL query to extract data from client database
// move extracted data to temporary Storage Blob hosted on the App Service storage account
return await httpClient.PostAsync(uri of the file in temporary blob storage)

当文件准备好处理时,第一个 await 会发布到 HTTP。
Azure Functions HTTP 触发器

// get storage account credentials
// write message to storage queue "job-submissions'
return new OkResult();

现在,“作业提交”队列中有来自多个客户端的文件。
Azure Functions 队列触发器

// pick up message from "job-submissions" queue
// use the Microsoft.Azure.Storage.Blob library to move files
// to a permanent spot in the data lake
// create meta file with info about the file 
// meta file contains info for when the extraction started and completed
// delete the temporary file
// job completed and the next queue message can be picked up  

所以问题是,当 HTTP 触发器将消息写入队列时,我无法知道队列已完成对文件的处理。现在这不是一个大问题,因为这个过程发生得非常快,当我在 HTTP 触发器中向队列发送消息时,队列最多只需要几秒钟来处理文件。我想知道各个作业何时完成的原因是因为我在客户端脚本中有最后一步:
客户端脚本

// after all jobs for a client have been submitted by HTTP
// get storage account credentials
// write message to a queue "client-tasks-completed" 
// queue message contains client name in the message 
// initialVisibilityDelay set to 2 minutes 
// this ensures queue has finished processing the files

然后,一个单独的 Python Azure 函数监听该队列以进行进一步处理:
Python QueueTrigger

# pick up message from "client-tasks-completed" queue
if 'client1' == queue_msg['ClientName']:
    # standardize information within the files and write to our Azure SQL database
elif 'client2' == queue_msg['ClientName']:
    # standardize information within the files and write to our Azure SQL database
elif 'client3' == queue_msg['ClientName']:
    # standardize information within the files and write to our Azure SQL database

Python Azure 函数在消耗计划中,batchSize 设置为 1,因为客户端文件有时可能很大,我不想超过 1.5 GB 内存限制。所以我有两个问题,第一个问题是我如何知道第一个队列触发器完成了它的工作?第二个问题是,如何确保Python QueueTrigger不会开始累积消息?我认为通过为监听同一队列的两个队列触发器创建单独的 Azure Functions 可以解决这两个问题。这会减轻双方的负担,但我不确定这是否是最佳实践。请参阅我的问题,其中我要求有关问题 2 的更多指导:Using multiple Azure Functions QueueTriggers to listen on the same storage queue

最佳答案

更新:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Threading;

namespace FunctionApp31
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {

            string a = "111";

            a=XX(a).Result;

            return new OkObjectResult(a);
        }

        public static async Task<string> XX(string x)
        {
            await Task.Run(()=>{
                Thread.Sleep(3000);
                x = x + "222";
                Console.WriteLine(x);
                }
                );

            return x;
        } 
    }
}

原始答案:

我建议您顺序执行处理逻辑,而不是异步执行。或者你可以等待异步操作完成后再返回,这样可以确保执行成功后再返回成功。(这样可以避免像你在评论中描述的那样,当队列仍在处理时返回结果。)

我注意到您提出了一个新问题。我认为您可以扩展实例而不是创建多个功能应用程序。 (当然创建多个功能应用没有问题)如果基于消耗计划,实例会根据负载自动伸缩。

关于azure - 如何确保队列消息已在 Azure Functions 中成功处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64685629/

相关文章:

azure - 尝试将 Azure 函数作为输出接收器添加到流分析作业时连接测试失败

azure - 为什么 Azure 事件中心 EventData 的 PartitionKey 字段为空?

Azure Function App无法将表绑定(bind)到类型 'Microsoft.WindowsAzure.Storage.Table.CloudTable'

java - 微软Azure表存储: CloudTable() error no suggestions available

c# - Azure函数: Could not load file or assembly Newtonsoft. Json

Azure 逻辑应用从单独接收的 xml 创建 JSON 数组

azure - 我应该如何将后端解决方案链接到 IoT 中心

c# - Azure函数: Unzip file works in debug but not in production

asp.net - 在计算模拟器中运行 Azure Web 角色时,web.config 文件在哪里?

azure - 如何在 Azure 聊天机器人中获取用户凭据