我正在尝试学习 TPL。我以这样的并行方式写入文件:
public async Task SaveToFilesAsync(string path, List<string> list, CancellationToken ct)
{
int count = 0;
foreach (var str in list)
{
string fullPath = path + @"\" + count.ToString() + "_element.txt";
using (var sw = File.CreateText(fullPath))
{
await sw.WriteLineAsync(str);
}
count++;
Log("Saved in thread: {0} to {1}",
Environment.CurrentManagedThreadId,
fullPath);
if (ct.IsCancellationRequested)
ct.ThrowIfCancellationRequested();
}
}
然后这样调用它:
var tasks = new List<Task>();
try
{
tasks.Add(SaveToFilesAsync(path, myListOfStrings, cts.Token));
}
catch (Exception ex)
{
Log("Failed to save: " + ex.Message);
throw;
}
tasks.Add(MySecondFuncAsync(), cts.Token);
//...
tasks.Add(MyLastFuncAsync(), cts.Token);
try
{
//Or should I call await Task.WhenAll(tasks) ? What should I call here?
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException ex)
{
foreach (var v in ex.InnerExceptions)
Error(ex.Message + " " + v.Message);
}
finally
{
cts.Dispose();
}
foreach (task in tasks)
{
// Now, how to print results from the tasks?
//Considering that all tasks return bool value,
//I need to do something like this:
if (task.Status != TaskStatus.Faulted)
Console.Writeline(task.Result);
else
Log("Error...");
}
我的目标是让所有函数(SaveToFilesAsync
、MySecondFuncAsync
)以并行方式同时运行,使用计算机上的所有内核并节省时间。但是当我看到 SaveToFilesAsync
的日志时,我意识到保存到文件总是发生在同一个线程中,而不是并行。我究竟做错了什么?第二个问题:如何从代码末尾的任务列表中的每个任务中获取 Task.Result?如果第二个函数返回 Task(bool),我如何在我的代码中获取 bool 值?此外,非常欢迎所有关于我的代码的评论,因为我是 TPL 的新人。
最佳答案
您需要将 foreach 循环替换为从第一项到最后一项按顺序运行的 Parallel.ForEach() 循环,该循环可配置为并行性,或 Parallel.For() 可为您提供索引当前处理的项目。由于您需要为文件名使用计数器,因此您需要修改列表参数以提供您在创建列表时填充的文件编号,或使用 Parallel.For() 提供的索引。另一种选择是有一个长变量,您可以在创建文件名后对其执行 Interlocked.Increment,但我不确定这是否是最佳选择,我还没有尝试过。
这是它的样子。
将调用 SaveFilesAsync 的代码包装在 try/catch 中以处理通过 CancellationTokenSource 取消的操作
var cts = new CancellationTokenSource();
try
{
Task.WaitAll(SaveFilesAsync(@"C:\Some\Path", files, cts.Token));
}
catch (Exception)
{
Debug.Print("SaveFilesAsync Exception");
}
finally
{
cts.Dispose();
}
然后用那个方法做你的并行。
public async Task SaveFilesAsync(string path, List<string> list, CancellationToken token)
{
int counter = 0;
var options = new ParallelOptions
{
CancellationToken = token,
MaxDegreeOfParallelism = Environment.ProcessorCount,
TaskScheduler = TaskScheduler.Default
};
await Task.Run(
() =>
{
try
{
Parallel.ForEach(
list,
options,
(item, state) =>
{
// if cancellation is requested, this will throw an OperationCanceledException caught outside the Parallel loop
options.CancellationToken.ThrowIfCancellationRequested();
// safely increment and get your next file number
int index = Interlocked.Increment(ref counter);
string fullPath = string.Format(@"{0}\{1}_element.txt", path, index);
using (var sw = File.CreateText(fullPath))
{
sw.WriteLine(item);
}
Debug.Print(
"Saved in thread: {0} to {1}",
Thread.CurrentThread.ManagedThreadId,
fullPath);
});
}
catch (OperationCanceledException)
{
Debug.Print("Operation Canceled");
}
});
}
您的代码的其他部分没有改变,只需调整您创建文件内容列表的位置。
编辑:围绕调用 SaveFileAsync 方法的 try/catch 实际上什么也没做,它全部在 SaveFileAsync 内部处理。
关于C# TPL 以并行方式调用任务并异步创建新文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31015213/