我正在研究如何将长时间运行的流程分成小部分,以保持全局概览。 当前执行包括以下内容:
- 调用第一个 API 端点,该端点检索要处理的项目列表。
- 对于 1. 中的每一项,它都会调用另一个端点并将该数据保存到数据库。
我刚刚创建了第一个 Azure 函数,该函数从 1 检索数据,并创建了为每个项目调用的第二个函数。我以这样一种方式构建它:将 1 的结果推送到带有由 2 处理的消息的队列中。通过这种方式,还可以从我们的后台调用某个项目的下载。
现在我面临的主要问题是概述整个过程并知道它何时完成。 之前很简单,在 for-each 循环结束时执行终止。现在在总线上推送 n 条消息意味着我需要某种方法来检测状态。
我想到的一种可能性是,当我开始将项目推送到消息总线时,我还会附加一个指南(一种运行指南)和发送到总线的项目总数。我将此数据存储在临时表中,然后当每个 azure func 完成时,它将 guid 及其项目详细信息写入另一个表。我可以通过这种方式了解进度,也可能了解单个项目的状态。如果对于某个“ Actor ”来说,当我们处理完所有项目时也发送一封电子邮件,那就太好了,但这可以通过表上显示的另一个函数来完成。
这能行吗?
最佳答案
这看起来非常适合 fan-out pattern 的持久功能工作流程。应用:
您的函数 F1 调用第一个 API 端点,该端点检索要处理的项目列表。然后,对于每个项目,以该项目作为输入调用函数 F2。处理完所有项目后,您可以终止或调用另一个函数 F3 对结果(如果有)执行某些操作。
一旦调用 F1 的编排函数完成,您就知道所有项目都已处理完毕。一个例子是
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
请注意,获得结果和调用 F3 完全是可选的,您的用例可能不需要。
关于azure - 将长时间运行的作业解耦为 azure 函数..如何获取整体状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75844478/