我正在创建一个架构来处理来自电子商务网站的订单,该网站每小时收到 10,000 个或更多订单。我们正在使用外部第三方订单履行服务,他们有大约 5 个我们必须运行的相互依赖的步骤/API。
我正在考虑使用扇入/扇出方法,我们可以使用持久功能。
我的计划
- 在我们端创建订单后,我们会将其存储在带有“订单已完成”标志的表中。
- 运行时间触发器 azure 函数,该函数运行持久函数编排器,该编排器为每个步骤调用事件函数
现在,如果失败,计时器将再次接收订单,直到完成。但我的问题是我们是否应该将此订单放在服务总线中并从那里提取它而不是时间触发器。
因为每小时可能有超过 10,000 条记录,所以我们必须在时间触发器函数中运行查询并查找未完成的订单,并循环运行持久编排器 10,000 次。我的第一个问题 - 我可以并行运行 10,000 条记录的持久函数吗?
如果我使用服务总线触发器来触发持久协调器,它会自动并行运行 azure 函数和持久 10,000 次,对吗?但在这种情况下,我必须构建一个死信队列函数/进程,因此如果失败,我们可以将其移至事件主题
问题:
- 持久功能是正确的方法还是有更好、更简单的方法?
- 如果是,时间触发更好还是服务总线触发更好?
- 我可以通过时间触发 azure 函数并行运行持久函数协调器吗?我不是在谈论调用事件函数,因为它们不能并行运行,因为我们需要一个事件函数的输出作为下一个事件函数的输入
最佳答案
此用例适合function chaining 。这可以通过
来完成- 让排序系统将消息放入队列( storage 或 servicebus )
- 使用 storage queue trigger 创建一个 Azure 函数或service bus trigger 。这也将是 client function触发编排功能
- 创建 orchestration function调用 5 个步骤的 API,每个 API 一个事件函数(类似于 function chaining 示例中给出的内容。
- 创建五个 activity function , 每个 API 一个 f
订购系统
var clientOptions = new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpWebSockets
};
//TODO: Replace the "<NAMESPACE-NAME>" and "<QUEUE-NAME>" placeholders.
client = new ServiceBusClient(
"<NAMESPACE-NAME>.servicebus.windows.net",
new DefaultAzureCredential(),
clientOptions);
sender = client.CreateSender("<QUEUE-NAME>");
var message = new ServiceBusMessage($"{orderId}");
await sender.SendMessageAsync(message);
客户端功能
public static class OrderFulfilment
{
[Function("OrderFulfilment")]
public static string Run([ServiceBusTrigger("<QUEUE-NAME>", Connection = "ServiceBusConnection")] string orderId,
[DurableClient] IDurableOrchestrationClient starter)
{
var logger = context.GetLogger("OrderFulfilment");
logger.LogInformation(orderId);
return starter.StartNewAsync("ChainedApiCalls", orderId);
}
}
编排功能
[FunctionName("ChainedApiCalls")]
public static async Task<object> Run([OrchestrationTrigger] IDurableOrchestrationContext fulfillmentContext)
{
try
{
// .... get order with orderId
var a = await context.CallActivityAsync<object>("ApiCaller1", null);
var b = await context.CallActivityAsync<object>("ApiCaller2", a);
var c = await context.CallActivityAsync<object>("ApiCaller3", b);
var d = await context.CallActivityAsync<object>("ApiCaller4", c);
return await context.CallActivityAsync<object>("ApiCaller5", d);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
事件功能
[FunctionName("ApiCaller1")]
public static string ApiCaller1([ActivityTrigger] IDurableActivityContext fulfillmentApiContext)
{
string input = fulfillmentApiContext.GetInput<string>();
return $"API1 result";
}
[FunctionName("ApiCaller2")]
public static string ApiCaller2([ActivityTrigger] IDurableActivityContext fulfillmentApiContext)
{
string input = fulfillmentApiContext.GetInput<string>();
return $"API2 result";
}
// Repeat 3 more times...
关于azure - 是否使用 Azure Durable Functions 进行订购流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74116518/