我有一个带有事件中心触发器的 Azure 函数。此集线器从设备接收消息并将它们存储在 Blob 中。最近,我注意到重复的消息存储在 blob 中。 blob 存储中的文件按上次修改日期排序,如果您查看屏幕截图,您会发现情况并非如此。有没有人见过这个问题?
我还有一个 Azure 函数正在写入 cosmos DB,对于 blob 中的重复消息,在 cosmos 中没有相应的重复消息。
我还连接了时间序列见解,其中也没有任何重复的消息。
我打开了事件中心捕获,那里也没有重复的消息。
这是屏幕截图。
第一列是事件中心排队时间的 Unix 时间戳。如果我没有与文件名关联的 guid,它将引发异常。这是一个将数据存储在 blob 中的代码段。
dynamic msg = JObject.Parse(myEventHubMessage);
string deviceId = msg.deviceId;
if (deviceId == "5Y.....")
{
var filename = "_" + ((DateTimeOffset)enqueuedTimeUtc).ToUnixTimeSeconds() + "_" + Guid.NewGuid().ToString() + ".json";
var containerName = "containerName/";
var path = containerName + deviceId + "/" + filename;
using (var writer = binder.Bind<TextWriter>(new BlobAttribute(path)))
{
writer.Write(myEventHubMessage);
}
}
这里的逻辑非常简单。如果事件到达事件中心,则会触发该函数并将数据存储在 Azure Blob 中。
最佳答案
一个重要的说明是事件中心有一个 at-least-once delivery guarantee ;强烈建议确保您的处理以适合您的应用程序场景的任何方式对事件重复具有弹性。
对于您在本例中看到的重复,Azure Functions 的绑定(bind)使用了 EventProcessorHost
以便读取事件并触发功能代码的执行。随着 Azure 函数自动向上和向下扩展,EventProcessorHost
的实例将加入和离开负责处理配置的事件中心的消费者组。
当处理器启动时,它将尝试平衡处理工作与同一消费者组的其他事件处理器。如果处理器无法通过声明未拥有的分区来达到其公平的工作份额,它将尝试从其他处理器窃取分区的所有权。在此期间,新所有者将从上次记录的检查点开始读取。同时,旧的所有者可能正在将上次读取的事件分派(dispatch)给处理程序进行处理;在尝试从事件中心服务读取下一组事件之前,它不会理解所有权已更改。当处理器关闭并放弃其分区所有权时,会发生类似的模式。
因此,当处理器启动或停止时,您会看到一些重复事件正在处理,当处理器达到负载平衡的稳定状态时,这些事件会消退。该窗口的持续时间应该很短,但会因处理器的配置和所使用的检查点策略而异。
关于azure-functions - 带有事件中心触发器的 Azure 函数写入重复的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63401913/