Azure IoT Hub触发器按顺序处理数据

标签 azure azure-functions azure-iot-hub azure-triggers

我有一个 IoT 设备 (ESP32),每 x 秒向 IoT 中心(免费层)发送数据包。每个数据包都包含一个整数属性PacketID,每次发送新数据包时该属性都会递增,从1开始,依此类推。当 IoT 中心收到数据时,将调用 Azure IoT 中心触发器(托管在门户上)并进一步处理数据 - 首先插入到 CosmosDB(从 StorageQueue 触发器),然后作为消息发送到 SignalR 服务(从另一个 StorageQueue 触发器) )并连接到客户端 Web 应用程序。

IoT 中心触发器如下所示:

    [FunctionName("FramesPacketAdd_IoTHubTrigger")]
    public async Task Run(
    [IoTHubTrigger("messages/events", Connection = "IoTHubConnectionString")] EventData message, ILogger _logger,
    [Queue("dbinsert-frames-packet-queue", Connection = "AzureStorageAccountConnectionString")] IAsyncCollector<FramesPacket> dbInsertFramesPacketQueue,
    [Queue("eventdata-parse-error-queue", Connection = "AzureStorageAccountConnectionString")] IAsyncCollector<string> eventDataParseErrorQueue)
    {        
        try
        {
            _logger.LogInformation($"C# IoT Hub trigger function FramesPacketAdd processed a message: {Encoding.UTF8.GetString(message.Body.Array)}");

            string messageBody = Encoding.UTF8.GetString(message.Body.Array);
            var jsonObj = JsonConvert.DeserializeObject<FramesPacket>(messageBody);

            var framesPacket = new FramesPacket
            {
                PacketID = jsonObj.PacketID,
                SessionID = jsonObj.SessionID,                    
                DeviceID = jsonObj.DeviceID,
                Frames = jsonObj.Frames
            };

            await dbInsertFramesPacketQueue.AddAsync(framesPacket);
        }

        catch (Exception ex)
        {
            await eventDataParseErrorQueue.AddAsync($"[IoTHubTrigger] function [FramesPacketAdd] caught exception: {ex.Message} \nStackTrace: {ex.StackTrace} \nJSON: {Encoding.UTF8.GetString(message.Body.Array)}");

            _logger.LogError($"[IoTHubTrigger] function [FramesPacketAdd_IoTHubTrigger] caught exception: {ex.Message} \nStackTrace: {ex.StackTrace} \nJSON: {Encoding.UTF8.GetString(message.Body.Array)}");
        }
    }

问题是,当数据包发送速度很快时,例如每个数据包之间间隔 200 毫秒,IoT 中心接收到的数据完全无序,这意味着有时数据包 #7 和 #8 在数据包 #1、# 之前被接收和处理。 2 和 #3,这对于我的客户端应用程序来说是一个问题,因为它依赖于按照从 ESP32 芯片发送的相同顺序接收数据包。我尝试以 1 秒的延迟发送每个数据包,但问题仍然存在,但它似乎只影响前三个数据包 - 有时它们按 3、1、2 的顺序接收,其余的顺序正确。更长的延迟似乎可以完全消除这个问题。我确实相信这是因为函数/触发器的异步性质?最后,我希望能够相对较快地发送它们,每个之间有 200-500 毫秒的延迟。

我对 Azure IoT 中心非常陌生,我的问题是,是否可以在门户端执行某些操作来确保按正确的顺序接收数据包,或者这是此类方法和此类情况的限制接收到数据后需要处理,可能在调用 IoT 中心触发器后使用存储队列或服务总线?

我希望我的问题有意义,否则我非常乐意提供更多详细信息。

提前致谢。

最佳答案

以下是两种可能的路径:

1. 使用 EventHub 触发的 Azure Function 处理来自 IoT 中心的事件中心兼容终结点的消息。

保证来自特定设备的每条消息都将发送到同一分区。

分区中的消息是有序的,因此您需要的是每个分区的事件处理器。

您可以使用 Azure 函数来实现此目的,在本例中,Azure 函数所做的是为每个分区创建租约,这使得每个分区只有一个 Azure 函数实例。这意味着如果您有 10 个分区,则将有 10 个 Azure 函数实例 处理来自每个分区的消息。 现在您要做的是确保在Azure函数中,您按顺序处理消息批处理,例如:

[FunctionName("EventHubTrigger")]
public static async Task RunAsync([EventHubTrigger("ordered", Connection = "EventHub")] EventData[] eventDataSet, TraceWriter log)
{
    log.Info($"Triggered batch of size {eventDataSet.Length}");
    
    //processing event by event
    foreach (var eventData in eventDataSet) {
        try
        {
            // process the event here
        }
        catch
        {
            // handle event exception
        }
    }
}

这里,在 foreach 循环之前,除了 native 行为之外,您还可以根据 sequenceNumber(如果您要发送它)对 eventDataSet 进行排序,仅用于案例。

这是blog .

2. 第二种方法非常相似,是关于使用服务总线队列和 session ,如以下代码示例所示:

public async Task Run(
    [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message, 
    ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
    // process the message here
}

这是blog也是如此。

需要注意的重要一点是,如果任何 Azure 函数实例(在第一种情况下)失败,租约将被续订,并且它可能会处理某些消息两次。 如果这是一个问题,那么您应该选择第二种情况。

关于Azure IoT Hub触发器按顺序处理数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64917114/

相关文章:

azure - 使用代码更改edgeAgent和edgeHub的 'createOptions'

c - Azure物联网中心: HTTP Device-to-Cloud Messages without SDK?

azure - 以编程方式更改预编译的 Azure Function C# 上的 cron 表达式

asp.net-mvc - Google OpenID 提供商在 Azure 上始终失败

c# - 在 Azure Function .net 5 中处理 blob 触发器时出现 Microsoft.Azure.Functions.Worker.Diagnostics.Exceptions.FunctionInputConverterException

c# - Azure 函数的 API 速率限制

azure - 辅助角色与 Azure Batch

c# - 无法加载文件或程序集 "System.Net.Http, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a"

c# - Azure Function - 在 Visual Studio 2017 中为不同环境添加不同的设置文件

Azure 流分析以逗号分隔