c# - 顺序执行的纯计算任务

标签 c# parallel-processing async-await

我尝试使用 async-await 并行运行几种计算密集型方法。

我有一个包含大约 80,000 个对象的列表,我将这些对象输入到返回任务的函数中:

public static void Main(string[] args)
{
    //...blah blah blah...

    var runner = new Runner(); //in a nutshell, I manage to get an object that has an async method on it.
    runner.Run().Wait(); //and I wait for it to complete.

    //...blah blah blah...
}

我的 runner 对象中有以下方法(或多或少......这是一个人为的例子):

public async Task Run()
{
    var items = ... //this is my list
    var tasks = items.Select(i => this.RunItemAsync(i)).ToArray();

    //I don't get here until the tasks are all finished...every single one...

    await Task.WhenAll(tasks).ConfigureAwait(false);
}

private async Task RunItemAsync(Item i)
{
    var subItems = i.GetSubItems();

    var tasks = subItems.Select(s => s.RunSubItemAsync(s)).ToArray();

    //I don't get here until the sub item tasks are all finished...

    await Task.WhenAll(tasks).ConfigureAwait(false);

    //does computations, doesn't wait on any async i/o, etc
    await this.ProcessAsync(i).ConfigureAwait(false);
}

private async Task RunSubItemAsync(SubItem s)
{
    //does computations, doesn't wait on any async i/o, etc
    ...
}

在过去一年左右的时间里,我一直在与 async await 作斗争,有时会使用 TPL Dataflow 实现出色的性能并制作一些非常酷的东西,但每隔一段时间我就会遇到这样的事情,但我做不到似乎得到任务来“激活”他们的并行能力。这个特定的项目将在大约 16 个内核的服务器上运行,所以我真的很想利用它。我的开发 VM 只分配了 2 个核心,但这仍然应该允许任务激活和并行运行(过去也是如此)。

我的观察

  • 我设法通过在 RunItemAsync 方法的开头插入一个小的 await Task.Delay(1).ConfigureAwait(false) 来并行运行。我知道这会创建某种形式的“呼吸室”,允许另一个任务使用该线程。然而,这还不够,因为它肮脏、不可靠,并且需要我有 Not Acceptable 延迟。
  • 如果没有前面提到的 Delay 调用,所有任务都在 Main Thread 上运行。这对我来说很明显,因为 Main 是启动这一切的函数。我对此没有问题,但我有 experiences过去,在 new Thread 创建的线程上运行任务会导致它无法使用默认任务调度程序运行,并且每个任务最终都会在该线程上按顺序运行。也许 Main Thread 属于这一类?

我的问题

我知道运行 ToArray 本身不会执行异步代码。然而,我希望发生的是,当我的RunItemAsync 方法到达它的第一个await 时,它将“停止”并允许下一次迭代调用 ToArray 运行。

我也明白添加 await Task.Delay 是有效的,因为它导致了我上面想要的结果。必须有某种方法可以做到这一点,而无需求助于 await Task.Delay...

我如何才能并行启动所有这些受计算限制的任务,而不会无意中导致它们按顺序运行?

最佳答案

目前有四种主要的并发库/技术可用。

  • async 最适合自然异步的单一操作,例如 I/O。
  • 任务并行库 (TPL) 最适合并行处理受 CPU 限制的工作。
  • TPL 数据流跨越异步 和并行,为处理数据提供网格/管道抽象。
  • Reactive Extensions (Rx) 在概念上类似于 TPL 数据流,但没有并行功能,而是具有许多与时间相关的功能。

在您的情况下,您希望使用 TPL。一个简单的 Parallel.ForEach 就足够了。

最后一点,同步代码(包括受 CPU 限制的并行代码)应该有一个同步 API;异步代码应该有一个异步 API。所以您希望您的 API 看起来是同步的,而不是异步的。

所以,像这样:

public static void Main(string[] args)
{
  var runner = new Runner();
  runner.Run();
}

public void Run()
{
  var items = ...
  Parallel.ForEach(items, i => this.RunItem(i));
}

private void RunItem(Item i)
{
  var subItems = i.GetSubItems();
  Parallel.ForEach(subItems, s => s.RunSubItem(s));
  this.Process(i);
}

private void RunSubItem(SubItem s)
{
  SemaphoreSlim.Wait(); // instead of WaitAsync
  ...
}

关于c# - 顺序执行的纯计算任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25901823/

相关文章:

c# - 运行 Fixtures 但不是并行 NUnit 中的方法

javascript - 返回 Promise 对象时使用 `Async` 关键字

c# - 修改 SQL Server Compact Edition 的默认连接字符串

c# - SyntaxWalker 不访问琐事

algorithm - 在所有 MPI 任务之间同步数据

c# - 在 ContinueWith() 之后,ConfigureAwait(False) 不会改变上下文

javascript - Protractor 异步/等待错误 : Unhandled promise rejection

c# - 在 ServiceStack AutoQuery 中多次加入同一个表

c# - ASP.NET MVC + 填充下拉列表

cuda - NVIDIA GPU 的 CUDA 核心和 OpenCL 计算单元之间有什么关系?