C# TPL 以并行方式调用任务并异步创建新文件

标签 c# .net concurrency async-await task-parallel-library

我正在尝试学习 TPL。我以这样的并行方式写入文件:

public async Task SaveToFilesAsync(string path, List<string> list, CancellationToken ct)
{
    int count = 0;
    foreach (var str in list)
    {
        string fullPath = path + @"\" + count.ToString() + "_element.txt";
        using (var sw = File.CreateText(fullPath))
        {
            await sw.WriteLineAsync(str);
        }
        count++;

        Log("Saved in thread: {0} to {1}", 
           Environment.CurrentManagedThreadId,
           fullPath);

        if (ct.IsCancellationRequested)
            ct.ThrowIfCancellationRequested();
    }
}

然后这样调用它:

var tasks = new List<Task>();

try
{
    tasks.Add(SaveToFilesAsync(path, myListOfStrings, cts.Token));
}
catch (Exception ex)
{
    Log("Failed to save: " + ex.Message);
    throw;
}

tasks.Add(MySecondFuncAsync(), cts.Token);
//...
tasks.Add(MyLastFuncAsync(), cts.Token);

try
{
    //Or should I call await Task.WhenAll(tasks) ? What should I call here?
    Task.WaitAll(tasks.ToArray()); 
}
catch (AggregateException ex)
{
    foreach (var v in ex.InnerExceptions)
       Error(ex.Message + " " + v.Message);
}
finally
{
   cts.Dispose();
} 

foreach (task in tasks)
{
// Now, how to print results from the tasks? 
//Considering that all tasks return bool value, 
//I need to do something like this:
if (task.Status != TaskStatus.Faulted)
         Console.Writeline(task.Result);
else
         Log("Error...");
}

我的目标是让所有函数(SaveToFilesAsyncMySecondFuncAsync)以并行方式同时运行,使用计算机上的所有内核并节省时间。但是当我看到 SaveToFilesAsync 的日志时,我意识到保存到文件总是发生在同一个线程中,而不是并行。我究竟做错了什么?第二个问题:如何从代码末尾的任务列表中的每个任务中获取 Task.Result?如果第二个函数返回 Task(bool),我如何在我的代码中获取 bool 值?此外,非常欢迎所有关于我的代码的评论,因为我是 TPL 的新人。

最佳答案

您需要将 foreach 循环替换为从第一项到最后一项按顺序运行的 Parallel.ForEach() 循环,该循环可配置为并行性,或 Parallel.For() 可为您提供索引当前处理的项目。由于您需要为文件名使用计数器,因此您需要修改列表参数以提供您在创建列表时填充的文件编号,或使用 Parallel.For() 提供的索引。另一种选择是有一个长变量,您可以在创建文件名后对其执行 Interlocked.Increment,但我不确定这是否是最佳选择,我还没有尝试过。

这是它的样子。

将调用 SaveFilesAsync 的代码包装在 try/catch 中以处理通过 CancellationTokenSource 取消的操作

var cts = new CancellationTokenSource();

try
{
    Task.WaitAll(SaveFilesAsync(@"C:\Some\Path", files, cts.Token));
}
catch (Exception)
{
    Debug.Print("SaveFilesAsync Exception");
}
finally
{
    cts.Dispose();
}

然后用那个方法做你的并行。

public async Task SaveFilesAsync(string path, List<string> list, CancellationToken token)
{
    int counter = 0;

    var options = new ParallelOptions
                      {
                          CancellationToken = token,
                          MaxDegreeOfParallelism = Environment.ProcessorCount,
                          TaskScheduler = TaskScheduler.Default
                      };

    await Task.Run(
        () =>
            {
                try
                {
                    Parallel.ForEach(
                        list,
                        options,
                        (item, state) =>
                            {
                                // if cancellation is requested, this will throw an OperationCanceledException caught outside the Parallel loop
                                options.CancellationToken.ThrowIfCancellationRequested();

                                // safely increment and get your next file number
                                int index = Interlocked.Increment(ref counter);
                                string fullPath = string.Format(@"{0}\{1}_element.txt", path, index);

                                using (var sw = File.CreateText(fullPath))
                                {
                                    sw.WriteLine(item);
                                }

                                Debug.Print(
                                    "Saved in thread: {0} to {1}",
                                    Thread.CurrentThread.ManagedThreadId,
                                    fullPath);
                            });
                }
                catch (OperationCanceledException)
                {
                    Debug.Print("Operation Canceled");
                }
            });
}

您的代码的其他部分没有改变,只需调整您创建文件内容列表的位置。

编辑:围绕调用 SaveFileAsync 方法的 try/catch 实际上什么也没做,它全部在 SaveFileAsync 内部处理。

关于C# TPL 以并行方式调用任务并异步创建新文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31015213/

相关文章:

c# - 为什么表格单元格宽度不随 CSS 改变?

c# - 在c#中的每个打印页面中打印页码

c# - 如何在字符串中存储 arraylist 项

c# - 无法将 lambda 表达式转换为类型 "string"因为它不是委托(delegate)类型?

c# - 如何在.net core中执行api版本控制?

c# - 如何在内存中创建一个文本文件并在其上写一些东西并在客户端中打开记事本并在其中打开该文件?

c# - "The type ' 系统.Windows.Forms.TreeNodeCollection ' has no constructors defined"

c# - C#中并发安全自定义数据类型的实现

java - 如何中断未完成且不抛出 ThreadInterruptionException 的 Java 代码

java - ExecutorService 占用内存过多