在尝试高效地遍历目录树时,我尝试了一个描述为 here 的 RX 解决方案.虽然此解决方案适用于小树深度,但不适用于大树深度。默认调度程序创建了太多线程,从而减慢了树遍历。
这是我使用的代码:
public static void TestTreeTraversal()
{
Func<DirectoryInfo, IObservable<DirectoryInfo>> recurse = null;
recurse = i => Observable.Return(i)
.Concat(i.GetDirInfos().ToObservable().SelectMany(d => recurse(d)))
.ObserveOn(Scheduler.Default);
var obs = recurse(new DirectoryInfo(@"C:\"));
var result = obs.ToEnumerable().ToList();
}
public static IEnumerable<DirectoryInfo> GetDirInfos(this DirectoryInfo dir)
{
IEnumerable<DirectoryInfo> dirs = null;
try
{
dirs = dir.EnumerateDirectories("*", SearchOption.TopDirectoryOnly);
}
catch (Exception)
{
yield break;
}
foreach (DirectoryInfo d in dirs)
yield return d;
}
如果删除 ObserveOn(Scheduler.Default),该函数的运行速度与单线程递归函数相同。使用 ObserveOn,似乎每次调用 SelectMany 时都会创建一个线程,从而显着减慢进程。
有没有办法控制/限制调度器可以同时使用的最大线程数?
有没有另一种方法可以用 Rx 编写这样的并行树遍历,而不会陷入这种并行陷阱?
最佳答案
它可以在 Rx 中用 this overload of the Merge operator 完成,也许通过传递 Environment.ProcessorCount
到 maxConcurrent
参数。
然而,Rx 被设计用来处理 IObservable<T>
用于 native 异步处理。当然你可以转换 IEnumerable<T>
进入 IObservable<T>
并并行处理它,就像您在此处所做的那样,但这与 Rx 中的规则背道而驰。
这个问题更自然的解决方案是 PLINQ , 以 IEnumerable<T>
开头并且设计用于将查询划分为并行进程,隐含地考虑可用物理处理器的数量。
Rx 主要是关于驯服并发性,而 PLINQ 主要是关于引入并发性。
未测试:
Func<DirectoryInfo, ParallelQuery<DirectoryInfo>> recurse = null;
recurse = dir => new[] { dir }.AsParallel()
.Concat(dir.GetDirInfos().AsParallel().SelectMany(recurse));
var result = recurse(new DirectoryInfo(@"C:\")).ToList();
关于c# - 递归和 Rx 并行性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28090216/