我有一个 IoT 设备 (ESP32),每 x 秒向 IoT 中心(免费层)发送数据包。每个数据包都包含一个整数属性PacketID,每次发送新数据包时该属性都会递增,从1开始,依此类推。当 IoT 中心收到数据时,将调用 Azure IoT 中心触发器(托管在门户上)并进一步处理数据 - 首先插入到 CosmosDB(从 StorageQueue 触发器),然后作为消息发送到 SignalR 服务(从另一个 StorageQueue 触发器) )并连接到客户端 Web 应用程序。
IoT 中心触发器如下所示:
[FunctionName("FramesPacketAdd_IoTHubTrigger")]
public async Task Run(
[IoTHubTrigger("messages/events", Connection = "IoTHubConnectionString")] EventData message, ILogger _logger,
[Queue("dbinsert-frames-packet-queue", Connection = "AzureStorageAccountConnectionString")] IAsyncCollector<FramesPacket> dbInsertFramesPacketQueue,
[Queue("eventdata-parse-error-queue", Connection = "AzureStorageAccountConnectionString")] IAsyncCollector<string> eventDataParseErrorQueue)
{
try
{
_logger.LogInformation($"C# IoT Hub trigger function FramesPacketAdd processed a message: {Encoding.UTF8.GetString(message.Body.Array)}");
string messageBody = Encoding.UTF8.GetString(message.Body.Array);
var jsonObj = JsonConvert.DeserializeObject<FramesPacket>(messageBody);
var framesPacket = new FramesPacket
{
PacketID = jsonObj.PacketID,
SessionID = jsonObj.SessionID,
DeviceID = jsonObj.DeviceID,
Frames = jsonObj.Frames
};
await dbInsertFramesPacketQueue.AddAsync(framesPacket);
}
catch (Exception ex)
{
await eventDataParseErrorQueue.AddAsync($"[IoTHubTrigger] function [FramesPacketAdd] caught exception: {ex.Message} \nStackTrace: {ex.StackTrace} \nJSON: {Encoding.UTF8.GetString(message.Body.Array)}");
_logger.LogError($"[IoTHubTrigger] function [FramesPacketAdd_IoTHubTrigger] caught exception: {ex.Message} \nStackTrace: {ex.StackTrace} \nJSON: {Encoding.UTF8.GetString(message.Body.Array)}");
}
}
问题是,当数据包发送速度很快时,例如每个数据包之间间隔 200 毫秒,IoT 中心接收到的数据完全无序,这意味着有时数据包 #7 和 #8 在数据包 #1、# 之前被接收和处理。 2 和 #3,这对于我的客户端应用程序来说是一个问题,因为它依赖于按照从 ESP32 芯片发送的相同顺序接收数据包。我尝试以 1 秒的延迟发送每个数据包,但问题仍然存在,但它似乎只影响前三个数据包 - 有时它们按 3、1、2 的顺序接收,其余的顺序正确。更长的延迟似乎可以完全消除这个问题。我确实相信这是因为函数/触发器的异步性质?最后,我希望能够相对较快地发送它们,每个之间有 200-500 毫秒的延迟。
我对 Azure IoT 中心非常陌生,我的问题是,是否可以在门户端执行某些操作来确保按正确的顺序接收数据包,或者这是此类方法和此类情况的限制接收到数据后需要处理,可能在调用 IoT 中心触发器后使用存储队列或服务总线?
我希望我的问题有意义,否则我非常乐意提供更多详细信息。
提前致谢。
最佳答案
以下是两种可能的路径:
1. 使用 EventHub 触发的 Azure Function 处理来自 IoT 中心的事件中心兼容终结点的消息。
保证来自特定设备的每条消息都将发送到同一分区。
分区中的消息是有序的,因此您需要的是每个分区的事件处理器。
您可以使用 Azure 函数来实现此目的,在本例中,Azure 函数所做的是为每个分区创建租约,这使得每个分区只有一个 Azure 函数实例。这意味着如果您有 10 个分区,则将有 10 个 Azure 函数实例 处理来自每个分区的消息。 现在您要做的是确保在Azure函数中,您按顺序处理消息批处理,例如:
[FunctionName("EventHubTrigger")]
public static async Task RunAsync([EventHubTrigger("ordered", Connection = "EventHub")] EventData[] eventDataSet, TraceWriter log)
{
log.Info($"Triggered batch of size {eventDataSet.Length}");
//processing event by event
foreach (var eventData in eventDataSet) {
try
{
// process the event here
}
catch
{
// handle event exception
}
}
}
这里,在 foreach 循环之前,除了 native 行为之外,您还可以根据 sequenceNumber
(如果您要发送它)对 eventDataSet
进行排序,仅用于案例。
这是blog .
2. 第二种方法非常相似,是关于使用服务总线队列和 session ,如以下代码示例所示:
public async Task Run(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message,
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
// process the message here
}
这是blog也是如此。
需要注意的重要一点是,如果任何 Azure 函数实例(在第一种情况下)失败,租约将被续订,并且它可能会处理某些消息两次。 如果这是一个问题,那么您应该选择第二种情况。
关于Azure IoT Hub触发器按顺序处理数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64917114/