以下方法执行 DFS 搜索并返回给定顶级项目 ID 的所有项目的列表。我如何修改它以利用并行处理?目前,获取子项的调用是针对堆栈中的每一项逐一进行的。如果我可以同时获取堆栈中多个项目的子项目,并更快地填充我的返回列表,那就太好了。我怎样才能以线程安全的方式做到这一点(使用 async/await 或 TPL,或其他任何东西)?
private async Task<IList<Item>> GetItemsAsync(string topItemId)
{
var items = new List<Item>();
var topItem = await GetItemAsync(topItemId);
Stack<Item> stack = new Stack<Item>();
stack.Push(topItem);
while (stack.Count > 0)
{
var item = stack.Pop();
items.Add(item);
var subItems = await GetSubItemsAsync(item.SubId);
foreach (var subItem in subItems)
{
stack.Push(subItem);
}
}
return items;
}
编辑: 我正在考虑一些类似的事情,但它没有结合在一起:
var tasks = stack.Select(async item =>
{
items.Add(item);
var subItems = await GetSubItemsAsync(item.SubId);
foreach (var subItem in subItems)
{
stack.Push(subItem);
}
}).ToList();
if (tasks.Any())
await Task.WhenAll(tasks);
更新: 如果我想批量处理任务,这样的方法可行吗?
foreach (var batch in items.BatchesOf(100))
{
var tasks = batch.Select(async item =>
{
await DoSomething(item);
}).ToList();
if (tasks.Any())
{
await Task.WhenAll(tasks);
}
}
我使用的语言是 C#。
最佳答案
下面是一个可用于异步并行遍历树的方法:
public static async Task<IEnumerable<T>> TraverseAsync<T>(
this IEnumerable<T> source,
Func<T, Task<IEnumerable<T>>> childSelector)
{
var results = new ConcurrentBag<T>();
Func<T, Task> foo = null;
foo = async next =>
{
results.Add(next);
var children = await childSelector(next);
await Task.WhenAll(children.Select(child => foo(child)));
};
await Task.WhenAll(source.Select(child => foo(child)));
return results;
}
该方法需要一个方法来异步获取每个节点的子节点,而您已经拥有了该方法。它并不生成根节点的特殊情况,因此您需要使用必须将它们置于此方法范围之外的方法,并将它们作为此方法的第一个参数提供。
调用代码可能如下所示:
var allNodes = await new[]{await GetItemAsync(topItemId)}
.TraverseAsync(item => GetSubItemsAsync(item.SubId));
该方法并行、异步地获取每个节点的子节点,当它们全部完成时将其自身标记为完成。然后,每个节点并行地递归计算其所有子节点。
您提到您担心使用递归,因为它会消耗堆栈空间,但这不是问题,因为这些方法是异步的。每次在递归中深入一层时,该方法不会在堆栈上更深一层;相反,它只是安排递归方法调用在稍后的时间点运行,因此每个级别始终从堆栈上的固定点开始。
如果您正在寻找一种限制并行数量的方法,因为担心并行数量太多,我首先请您尝试一下。如果您将这里的所有调用都定向到线程池,那么线程池本身可能会根据它认为可能执行得最好的方式在并行量上有一个上限。它只会停止创建更多线程,并在某个点之后将挂起的项目保留在队列中,并且线程池比您更有可能拥有有效的算法来确定适当的并行度。也就是说,如果您迫切需要人为限制超出线程池功能的并行量,那么肯定有办法。一种选择是创建您自己的同步上下文,人为地将挂起操作的数量限制为某个固定数量:
public class FixedDegreeSynchronizationContext : SynchronizationContext
{
private SemaphoreSlim semaphore;
public FixedDegreeSynchronizationContext(int maxDegreeOfParallelism)
{
semaphore = new SemaphoreSlim(maxDegreeOfParallelism,
maxDegreeOfParallelism);
}
public override async void Post(SendOrPostCallback d, object state)
{
await semaphore.WaitAsync().ConfigureAwait(false);
try
{
base.Send(d, state);
}
finally
{
semaphore.Release();
}
}
public override void Send(SendOrPostCallback d, object state)
{
semaphore.Wait();
try
{
base.Send(d, state);
}
finally
{
semaphore.Release();
}
}
}
您可以创建一个这样的上下文实例,并在调用 TraverseAsync
之前将其设置为当前上下文,或者创建另一个接受 maxDegreesOfParallelism
的重载并设置上下文在方法内部。
这种情况的另一种变化是限制子选择器的调用次数,而不对此处正在进行的任何其他异步操作的数量施加任何限制。 (其他的都不应该特别昂贵,所以我不希望它有多大影响,但这肯定是值得尝试的。)为此,我们可以创建一个任务队列来处理赋予它的项目具有固定的并行度,但这不会人为地限制任何未传递到该队列的内容。队列本身非常简单,作为同步上下文的直接变体:
public class FixedParallelismQueue
{
private SemaphoreSlim semaphore;
public FixedParallelismQueue(int maxDegreesOfParallelism)
{
semaphore = new SemaphoreSlim(maxDegreesOfParallelism,
maxDegreesOfParallelism);
}
public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
await semaphore.WaitAsync();
try
{
return await taskGenerator();
}
finally
{
semaphore.Release();
}
}
public async Task Enqueue(Func<Task> taskGenerator)
{
await semaphore.WaitAsync();
try
{
await taskGenerator();
}
finally
{
semaphore.Release();
}
}
}
在这里,当调用该方法时,您可以使用该队列作为子选择器的一部分:
ar taskQueue = new FixedParallelismQueue(degreesOfParallelism);
var allNodes = await new[]{await GetItemAsync(topItemId)}
.TraverseAsync(item =>
taskQueue.Enqueue(() => GetSubItemsAsync(item.SubId)));
关于c# - 如何使用异步/并行处理迭代执行深度优先搜索?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25432367/