c# - 仅使用一小部分 CPU 在多个任务中运行 CPU 密集型方法?

标签 c# multithreading task-parallel-library html-agility-pack

我正在运行一个具有 24 个线程 (5900X) 的 CPU,启动 20 个任务来执行一个应该完全受 CPU 限制但 CPU 负载峰值为 10% 的操作。试图看看是否有人可以阐明这是我误解了任务如何线程化,还是执行处理的库 (HtmlAgilityPack) 有问题?

这是一个有点复杂的例子:

public async static Task TestHtmlAgilityPack(bool loadHtml = true)
{
    // "basePath" is a folder has approx 20 folders each containing approx 3000 files (20 tasks * 3,000 files = 60k overall)
    var dirs = Directory.GetDirectories(basePath);
    List<Task> tasks = new();
    var strs = new ConcurrentBag<string>();
    foreach (var dir in dirs)
    {
        tasks.Add(Task.Run(() =>
        {
            foreach (var file in Directory.GetFiles(dir, "*.html")) // Each of the 20 tasks processes approx 3000 files
            {
                var html = File.ReadAllText(file);
                strs.Add(html.Substring(1, 1000));
                if (loadHtml)
                {
                    var doc = new HtmlDocument();
                    doc.LoadHtml(html);
                }
            }
        }));
    }
    await Task.WhenAll(tasks);
    Console.WriteLine(strs.Last());
}

如果我在没有 LoadHtml 的情况下运行它,它将在 15 秒内完成,因此 IO 访问时间是微不足道的。使用 LoadHtml 现在需要 20 分钟,我知道将 HTML 解析为可查询的形式需要时间,这很好/预期,但令人困惑的是它(应该?)是一个纯粹的 CPU 密集型操作,它不等待任何东西。为什么在 24 线程 CPU 上通过 CPU 密集型操作加载 20 个线程时,CPU 峰值达到 10% 而不是接近 80%?

这是否表明 LoadHtml 方法或其他方法效率低下?

最佳答案

此代码存在几个限制其可扩展性的问题。

  • 它在不使用异步方法的情况下在同一任务中执行 IO 和 CPU 工作。您可以同时执行的 CPU 密集型任务的数量受内核数量的限制。您可以执行比这更多的异步任务。
  • IO 是阻塞的,不是异步的。这意味着任务(或者更确切地说,它的线程)在等待操作系统检索数据时什么都不做。
  • 代码读取了太多数据并生成了太多临时对象。 ReadAllText 在只需要 1000 个字符时读取整个文件。字符串是不可变的,因此 html.Substring(1,1000) 生成一个 子字符串。所有这些都会占用内存,并且必须在某个时刻进行垃圾回收。
  • ConcurrentBag 不是像 ConcurrentQueue 或 ConcurrentDictionary 这样的通用并发集合。它使用线程本地存储来确保创建项目的线程可以比其他线程更快地检索项目。

.NET 提供了几个高级类,可用于构建比加载、解析和导入文件复杂得多的解析管道。这些包括 Dataflow blocks , ChannelsAsync Streams/IAsyncEnumerable .

改进问题代码的一种方法是使用 Dataflow block 来枚举根文件夹、加载文件内容并在具有不同并行度的不同 block 中解析它。

首先,可以将抓取、加载和解析代码提取到单独的方法中:

record Search(DirectoryInfo root,string pattern);
record Sample(FileInfo file,string sample);
record SampleHtml(FileInfo file,HtmlDocument html);

IEnumerable<FileInfo> Crawl(Search search)
{
    var (root,pattern)=search;
    var searchOptions=new EnumerationOptions { 
        RecurseSubdirectories=true,
        IgnoreInaccessible=true
    };
    return root.EnumerateFiles(pattern,searchOptions);
}

async Task<Sample> ReadSample(FileInfo file,int length)
{
    var buffer=new char[length+1];
    using var reader=file.OpenText();
    var count=await reader.ReadBlockAsync(buffer,length+1);
    var html= new String(buffer,1,count-1);
    return new Sample(file,html);
}

SampleHtml ParseSample(Sample sample)
{
    var html=new HtmlDocument();
    html.LoadHtml(sample.sample);
    return new SampleHtml(sample.file,html);
}

数据流 block 可用于创建管道:

  1. 单线程文件搜索 block
  2. 加载程序 block 一次加载 2 个文件
  3. 一次解析 4 个样本的解析器 block
  4. 结果 BufferBlock收集解析器的输出
var loadOptions=new ExecutionDataflowBlockOptions{ 
    MaxDegreeOfParallelism=2,
    BoundedCapacity=1
};
var parseOptions=new ExecutionDataflowBlockOptions{ 
    MaxDegreeOfParallelism=4,
    BoundedCapacity=1
};

var crawler=new TransformManyBlock<Search,FileInfo>(search=>
    Crawl(search);

var loader =  new TransformBlock<FileInfo,Sample>(file=> 
    ReadSample(file,1000),loadOptions);

var parserBlock=new TransformBlock<Sample,SampleHtml>(sample=>
    ParseHtml(sample),parseOptions);

var results=new BufferBlock<SampleHtml>();

var linkOptions=new DataflowLinkOptions {
    PropagateCompletion = true
};
crawler.LinkTo(loader,linkOptions);
loader.LinkTo(parser,linkOptions);
//Don't propagate completion, we just cache results here
parser.Linkto(results);

为了使用管道,我们将搜索规范发布到头部 block crawler 并等待直到最后一个 block ,解析器,完成所有处理

var root=new DirectoryInfo(path_to_root);
var pattern="*.html";
await crawler.SendAsync(new Search(root,pattern));
crawler.Complete();
await parser.Completion;

这一点 results 包含所有结果。我们可以使用TryReceive 一个接一个地弹出项目或TryReceiveAll将所有内容读入容器:

if(results.TryReceiveAll(out var docs)
{
    var last=docs[^1];
}

loaderparser block 的 BoundedCapacity 为 1。这意味着它们的输入缓冲区将只接受超出正在处理的项目的单个项目.任何上游 block 都必须在发布新项目之前等待,一直到爬虫。这可以防止内存中充满处理速度不够快的对象。

重复使用缓冲区

ArrayPool类可以提供可重用的缓冲区,从而避免为每个文件创建新的 char[1001] 缓冲区。加载程序 DOP 为 4,这意味着我们只需要 4 个缓冲区而不是 3000 个缓冲区:

async Task<Sample> ReadSample(FileInfo file,int length)
{
    var buffer=ArrayPool<char>.Shared.Rent(length+1);
    try
    {
        using var reader=file.OpenText();
        var count=await reader.ReadBlockAsync(buffer,0,length+1);
        var html= new String(buffer,1,count-1);
        return new Sample(file,html);
    }
    finally
    {
        ArrayPool<char>.Shared.Return(buffer);
    }
}

这剩下 3000 个 1000 个字符的字符串对象。如果将加载器和解析器修改为传递 byte[] 缓冲区而不是字符串,则可以消除这些问题。 HtmlAgilityPack 的 Load 可以从任何 StreamStreamReader 读取,这意味着它也可以从包装缓冲区的 MemoryStream 加载。

唯一的问题是 UTF8 对每个字符使用的字节数可变,因此无法提前猜测准确读取 1000 个字符需要多少字节。如果 HTML 文件预计包含大量非英语文本,则必须增加 ReadSample 的长度。

record Sample(FileInfo file,byte[] buffer,int count);

async Task<Sample> ReadSample(FileInfo file,int length)
{
    var buffer=ArrayPool<char>.Shared.Rent(length+1);
    using var reader=file.OpenText();
    var count=await reader.ReadBlockAsync(buffer,0,length+1);
    return new Sample(file,buffer,count);
}

SampleHtml ParseSample(Sample sample)
{
    try
    {
        var html=new HtmlDocument();
        using var ms=new MemoryStream(sample.buffer,1,sample.count);
        html.Load(ms);
        return new SampleHtml(sample.file,html);
    }
    finally
    {
        ArrayPool<char>.Shared.Return(sample.buffer);
    }
}

关于c# - 仅使用一小部分 CPU 在多个任务中运行 CPU 密集型方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73182184/

相关文章:

c# - 以编程方式获取所有内置类型的列表

objective-c - 指定在主线程空闲时调用某些东西

c# - 无法将 TransactionScope 与任务一起使用

c# - 具有返回值和异步/等待的并行执行

c# - ExceptionHandler 被调用但不返回 JSON

javascript - 弹出窗口打开后重定向到页面

c# - 获取从A*出发的最优路径

multithreading - 使用多线程时Kotlin无法解析的引用

java - 每个连接的线程与每个请求的线程有什么区别?

c# - Parallel.For 使用步骤 != 1