我正在处理 azure 函数和 azure web 作业。
我想要做的是,将消息发送到服务总线主题,然后从那里,消息在下面的两个内容中进行处理,
- 首先通过 Azure 函数 - 从一个订阅中读取,然后应用程序发送 给 Cosmos DB 的消息
- 其次是 Azure Web 作业 - 从另一个订阅读取并发送消息 到 Azure Blob 存储。
它们都使用 Azure 服务总线触发器。
所以,如果消息数量非常少,例如每秒 1 到 5 条消息,它将得到处理。但是,如果我同时发送 30-40 条消息,函数应用程序和 Web 作业都会丢失一些消息,我的意思是,如果我发送 30 条消息并检查 blob,它会显示仅收到 25 条消息。
但是,当我添加一个按时间触发器运行的 azure 功能应用程序时,它可以完美地处理 30 个发送 -30 个处理,但我真的不想使用时间触发器,因为它一天内点击次数太多。
如果有人能帮助我,那就太好了。
函数触发器:
public static void ProcessArchiveQueueMessage([ServiceBusTrigger("*****", "*****", Connection = "ServiceBusConnectionString")] string message, ILogger log)
{
ArchiveDll.ProcessArchiveTopic("****", log);
}
DLL 代码:
subscriptionClient.OnMessageAsync(m =>
{
try
{
Stream stream = m.GetBody<Stream>();
StreamReader reader = new StreamReader(stream);
topicData = reader.ReadToEnd();
var jObj = JsonConvert.DeserializeObject<JObject>(topicData);
StoreJsoninBlob(cloudContainer, stream, jObj);
StoreInCosmosDB(log, jObj);
subscriptionClient.Complete(m.LockToken);
}
catch (Exception ex)
{
subscriptionClient.DeadLetter(m.LockToken);
}
return Task.CompletedTask;
}, new Microsoft.ServiceBus.Messaging.OnMessageOptions()
{
AutoComplete = false,
MaxConcurrentCalls = 1
});
}
catch (Exception e)
{
}
}
最佳答案
下面是我用来解决上述问题的最新代码。
public static void ProcessArchiveTopic(string subscriptionName, ILogger log, string message)
{
log.LogInformation("Registering message listener for " + subscriptionName);
//Below method is triggered every time when a new message is read from queue.
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(ConfigurationSettings.AppSettings["BlobConnectionString"]);
CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
CloudBlobContainer cloudContainer = blobClient.GetContainerReference(ConfigurationSettings.AppSettings["BlobContainerName"]);
cloudContainer.CreateIfNotExists();
try
{
#region"function app reading message"
log.LogInformation("Processing message for subscription: " + subscriptionName);
var jObj = JsonConvert.DeserializeObject<JObject>(message);
log.LogInformation("Converted message to Json");
#endregion
StoreJsoninBlob(cloudContainer, GenerateStreamFromString(message), jObj);
StoreInCosmosDB(log, jObj);
}
关于具有服务总线触发器的 Azure 函数应用程序和 Web 作业在处理时丢失一些消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59869750/