我尝试使用 async-await 并行运行几种计算密集型方法。
我有一个包含大约 80,000 个对象的列表,我将这些对象输入到返回任务的函数中:
public static void Main(string[] args)
{
//...blah blah blah...
var runner = new Runner(); //in a nutshell, I manage to get an object that has an async method on it.
runner.Run().Wait(); //and I wait for it to complete.
//...blah blah blah...
}
我的 runner 对象中有以下方法(或多或少......这是一个人为的例子):
public async Task Run()
{
var items = ... //this is my list
var tasks = items.Select(i => this.RunItemAsync(i)).ToArray();
//I don't get here until the tasks are all finished...every single one...
await Task.WhenAll(tasks).ConfigureAwait(false);
}
private async Task RunItemAsync(Item i)
{
var subItems = i.GetSubItems();
var tasks = subItems.Select(s => s.RunSubItemAsync(s)).ToArray();
//I don't get here until the sub item tasks are all finished...
await Task.WhenAll(tasks).ConfigureAwait(false);
//does computations, doesn't wait on any async i/o, etc
await this.ProcessAsync(i).ConfigureAwait(false);
}
private async Task RunSubItemAsync(SubItem s)
{
//does computations, doesn't wait on any async i/o, etc
...
}
在过去一年左右的时间里,我一直在与 async await 作斗争,有时会使用 TPL Dataflow 实现出色的性能并制作一些非常酷的东西,但每隔一段时间我就会遇到这样的事情,但我做不到似乎得到任务来“激活”他们的并行能力。这个特定的项目将在大约 16 个内核的服务器上运行,所以我真的很想利用它。我的开发 VM 只分配了 2 个核心,但这仍然应该允许任务激活和并行运行(过去也是如此)。
我的观察
- 我设法通过在
RunItemAsync
方法的开头插入一个小的await Task.Delay(1).ConfigureAwait(false)
来并行运行。我知道这会创建某种形式的“呼吸室”,允许另一个任务使用该线程。然而,这还不够,因为它肮脏、不可靠,并且需要我有 Not Acceptable 延迟。 - 如果没有前面提到的
Delay
调用,所有任务都在Main Thread
上运行。这对我来说很明显,因为Main
是启动这一切的函数。我对此没有问题,但我有 experiences过去,在new Thread
创建的线程上运行任务会导致它无法使用默认任务调度程序运行,并且每个任务最终都会在该线程上按顺序运行。也许Main Thread
属于这一类?
我的问题
我知道运行 ToArray
本身不会执行异步代码。然而,我希望发生的是,当我的RunItemAsync
方法到达它的第一个await
时,它将“停止”并允许下一次迭代调用 ToArray
运行。
我也明白添加 await Task.Delay
是有效的,因为它导致了我上面想要的结果。必须有某种方法可以做到这一点,而无需求助于 await Task.Delay
...
我如何才能并行启动所有这些受计算限制的任务,而不会无意中导致它们按顺序运行?
最佳答案
目前有四种主要的并发库/技术可用。
async
最适合自然异步的单一操作,例如 I/O。- 任务并行库 (TPL) 最适合并行处理受 CPU 限制的工作。
- TPL 数据流跨越
异步
和并行,为处理数据提供网格/管道抽象。 - Reactive Extensions (Rx) 在概念上类似于 TPL 数据流,但没有并行功能,而是具有许多与时间相关的功能。
在您的情况下,您希望使用 TPL。一个简单的 Parallel.ForEach
就足够了。
最后一点,同步代码(包括受 CPU 限制的并行代码)应该有一个同步 API;异步代码应该有一个异步 API。所以您希望您的 API 看起来是同步的,而不是异步的。
所以,像这样:
public static void Main(string[] args)
{
var runner = new Runner();
runner.Run();
}
public void Run()
{
var items = ...
Parallel.ForEach(items, i => this.RunItem(i));
}
private void RunItem(Item i)
{
var subItems = i.GetSubItems();
Parallel.ForEach(subItems, s => s.RunSubItem(s));
this.Process(i);
}
private void RunSubItem(SubItem s)
{
SemaphoreSlim.Wait(); // instead of WaitAsync
...
}
关于c# - 顺序执行的纯计算任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25901823/