具有服务总线触发器的 Azure 函数应用程序和 Web 作业在处理时丢失一些消息

标签 azure azure-functions azure-webjobs azure-servicebus-queues

我正在处理 azure 函数和 azure web 作业。

我想要做的是,将消息发送到服务总线主题,然后从那里,消息在下面的两个内容中进行处理,

  • 首先通过 Azure 函数 - 从一个订阅中读取,然后应用程序发送 给 Cosmos DB 的消息
  • 其次是 Azure Web 作业 - 从另一个订阅读取并发送消息 到 Azure Blob 存储。

它们都使用 Azure 服务总线触发器。

所以,如果消息数量非常少,例如每秒 1 到 5 条消息,它将得到处理。但是,如果我同时发送 30-40 条消息,函数应用程序和 Web 作业都会丢失一些消息,我的意思是,如果我发送 30 条消息并检查 blob,它会显示仅收到 25 条消息。

但是,当我添加一个按时间触发器运行的 azure 功能应用程序时,它可以完美地处理 30 个发送 -30 个处理,但我真的不想使用时间触发器,因为它一天内点击次数太多。

如果有人能帮助我,那就太好了。

函数触发器:

        public static void ProcessArchiveQueueMessage([ServiceBusTrigger("*****", "*****", Connection = "ServiceBusConnectionString")] string message, ILogger log)
        {
            ArchiveDll.ProcessArchiveTopic("****", log);
        }

DLL 代码:

       subscriptionClient.OnMessageAsync(m =>
                {
                    try
                    {
                        Stream stream = m.GetBody<Stream>();
                        StreamReader reader = new StreamReader(stream);
                        topicData = reader.ReadToEnd();
                        var jObj = JsonConvert.DeserializeObject<JObject>(topicData);

                        StoreJsoninBlob(cloudContainer, stream, jObj);

                        StoreInCosmosDB(log, jObj);

                        subscriptionClient.Complete(m.LockToken);
                    }
                    catch (Exception ex)
                    {
                        subscriptionClient.DeadLetter(m.LockToken);
                    }

                    return Task.CompletedTask;

                }, new Microsoft.ServiceBus.Messaging.OnMessageOptions()
                {
                    AutoComplete = false,
                    MaxConcurrentCalls = 1
                });
            }
            catch (Exception e)
            {

            }
        }

最佳答案

下面是我用来解决上述问题的最新代码。

public static void ProcessArchiveTopic(string subscriptionName, ILogger log, string message)
{

  log.LogInformation("Registering message listener for " + subscriptionName);

  //Below method is triggered every time when a new message is read from queue.

  CloudStorageAccount storageAccount = CloudStorageAccount.Parse(ConfigurationSettings.AppSettings["BlobConnectionString"]);
  CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
  CloudBlobContainer cloudContainer = blobClient.GetContainerReference(ConfigurationSettings.AppSettings["BlobContainerName"]);
  cloudContainer.CreateIfNotExists();

  try
  {
      #region"function app reading message"

      log.LogInformation("Processing message for subscription: " + subscriptionName);
      var jObj = JsonConvert.DeserializeObject<JObject>(message);
      log.LogInformation("Converted message to Json");
      
      #endregion

      StoreJsoninBlob(cloudContainer, GenerateStreamFromString(message), jObj);
      StoreInCosmosDB(log, jObj);
  }

关于具有服务总线触发器的 Azure 函数应用程序和 Web 作业在处理时丢失一些消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59869750/

相关文章:

azure - 使用部署槽作为单例运行 azure webjob

azure - 如何在 Azure 中使用 NEST/elasticsearch?

azure - 2 个 VNET 与 1 个具有 2 个子网的 VNET?

azure - Azure 数据平台中的 AAS 表格模型或多维 SSAS 与 Azure Synapse 之间的最佳方法是什么

c# - 拦截 Azure 函数主机关闭 : Flush Application Insights TelemetryClient

python - Azure 使用 python Flask 框架作为函数应用程序

azure - 需要澄清 Azure Functions 生态系统

azure - 我可以从 Databricks 中的当前笔记本启动另一个集群吗?

azure - 预编译的 Azure 函数行为异常

python - 多个 Azure Web 作业