线程Nesting await in Parallel.ForEach有一个答案建议使用 Task.WhenAll 并行运行多个 (MaxDegreeOfParallelism) 异步任务,而不是等到上一个任务完成。
public static Task ForEachAsync<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
});
}));
}
并称其为
ids.ForEachAsync(10, async id =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(id);
customers.Add(cust);
});
如果body有参数,我想在处理异常时知道参数值。如果任务主体因 id 失败,我需要记录异常,并指定它是针对特定 id 发生的。
我看过了 Accessing values in Task.ContinueWith ,但在 t.IsFaulted 时无法访问参数。
最后我在 lambda 体内添加了 try/catch,它似乎可以工作
ids.ForEachAsync(10, async id =>
{
try
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(id);
customers.Add(cust);
}
catch(Exception e)
{
_logger.LogError(e,” id=“+ id);
}
});
但是我不确定它是否正常工作(即异步,无阻塞)。
后来原答案的作者suggested在等待正文之前使用 var current = partition.Current,然后在延续中使用 current (ContinueWith(t => { ... })。–
谁能确认哪种方法更好?每种方法都有缺点吗?
最佳答案
将 await
包装在 try
/catch
中即可,请参阅:Catch an exception thrown by an async method 。与我的建议(捕获 partition.Current
并注入(inject)到 ContinueWith
延续)没有太大区别,只是它可能更高效,因为不涉及捕获。我认为它更具可读性和优雅性,ContinueWith
是一种“旧”的做事方式(pre async
/await
) .
请注意,您的示例将异常处理的负担传递给调用者(在本例中调用_logger.LogError
)。您需要确保这就是您想要的,而不是在 ForEachAsync 代码本身中嵌入一个包罗万象的内容来处理调用者确实让异常溜过的情况。像这样的东西:
while (partition.MoveNext())
{
try
{
await body(partition.Current)
}
catch (Exception e)
{
// of course here you don't know the type of T (partition.Current)
// or anything else about the operation for that matter
LogError("error processing: " + partition.Current + ": " + e);
}
}
关于.net - 如何在任务异常处理数组中包含任务参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48019075/