我有一个耐用的功能扇出和扇入模式,但似乎工作不可靠。每 10 分钟从计时器函数调用一次 Orchestration,但此后已将其增加到 20 分钟。 使用 context.CallActivityAsync 调用 Activity 函数并返回一个整数(已处理的行数)。目前,传入的 workItems 应该只包含 2 个要处理的项目。第一项处理所有行并在日志中显示完整的行。第二项有时显示正在处理的行,但在某些时候它只是停止...没有其他事件被识别,并且“完成”永远不会显示在日志中。另外,第二个事件有时会显示它同时运行多次...... 我已经在我的开发机器上使用相同的数据尝试了这个确切的代码,它处理完成的时间不超过 5 分钟。我还将hosts.json 文件设置为
{
"version": "2.0",
"functionTimeout": "00:10:00",
"extensions": {
"queues": {
"maxPollingInterval": "00:00:05"
}
}
}
编排:
public static async void RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
log.LogInformation($"************** Fanning out ********************");
var parallelTasks = new List<Task<int>>();
//object[] workBatch = await context.CallActivityAsync<object[]>("GETVendors", null);
object[] workBatch = GETVendors(log); //work batch only has 2 items
for (int i = 0; i<workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("SynchVendor", workBatch[i]);
parallelTasks.Add(task);
}
log.LogInformation($"************** 'Waiting' for parallel results ********************");
await Task.WhenAll(parallelTasks);
log.LogInformation($"************** All activity functions complete ********************");
log.LogInformation($"************** fanning in ********************");
int cnt = 0;
foreach (var completedParallelActivity in parallelTasks)
{
cnt += completedParallelActivity.Result;
}
log.LogInformation($"Total Records Converted across all tasks = {cnt}");
//return outputs;
}
事件函数
public static async Task<int> SynchVendor([ActivityTrigger] string vendor, ILogger log)
{
log.LogInformation($"SynchVendor {vendor}");
string sqlStr = Environment.GetEnvironmentVariable("Sqldb_Connection");
bool dev = Convert.ToBoolean(Environment.GetEnvironmentVariable("Dev"));
int totalCount = 0;
using (SqlConnection conn = new SqlConnection(sqlStr))
{
conn.Open();
// lets synch the vendor
Int32 limit = 200;
bool initialLoad = false;
int offset = 0;
bool done = false;
do
{
//synch logic...
// if there are rows found to have changed then send them to a queue for further processing
} while (!done);
// we are done syncing a vendor write out the vendorinfo
conn.Close();
}
log.LogInformation($"SynchVendor {vendor} Complete");
return totalCount;
最佳答案
对于额外的日志记录,您需要将其添加到 log.Log*** 操作前面:
if (!context.IsReplaying)
对于永远看不到完成的问题,您可以执行以下操作:
- 您没有错误处理能力。如果您的事件函数抛出异常会发生什么?您应该使用带有日志的 Try/Catch 来让您知道何时出现故障。
- 功能日志显示什么?请注意,我不是在谈论 customDimensions.Category=="User"的日志(即您的日志条目),而是在谈论 Functions 运行时执行的日志。在 appinsights 日志中,在适当的时间范围内运行“union traces | union exceptions”,以查看 Functions 运行时正在执行的操作。
- 尝试添加超时任务,以便即使您的其中一项任务未完成,您的协调器也能完成。请参阅https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-error-handling?tabs=csharp#function-timeouts .
关于Azure Durable Function Activity 似乎运行了多次且未完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60215295/