c# - CPU 绑定(bind)任务的并行化继续与 IO 绑定(bind)

标签 c# multithreading asynchronous parallel-processing task

我正在尝试找出一种好的方法来对处理大数据集的代码进行并行化,然后将结果数据导入 RavenDb。

数据处理受 CPU 限制和数据库导入 IO 限制。

我正在寻找一种解决方案,以对 Environment.ProcessorCount 线程数进行并行处理。然后,应将生成的数据导入到与上述过程并行的 x(比方说 10)个池化线程上的 RavenDb。

这里的主要内容是我希望在导入完成的数据时继续处理,以便在等待导入完成时继续处理下一个数据集。

另一个问题是成功导入后需要丢弃每个批处理的内存,因为私有(private)工作内存很容易达到 >5GB。

下面的代码是我到目前为止所得到的。请注意,它不满足上述并行化要求。

datasupplier.GetDataItems()
    .Partition(batchSize)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(batch =>
    {
        Task.Run(() =>
        {
            ...
        }
    }

GetDataItem 生成可枚举的数据项,这些数据项被划分为一个批处理数据集。 GetDataItem 将产生约 2,000,000 个项目,每个项目平均需要 0.3 毫秒左右的处理时间。

该项目在 x64 平台上的最新 .NET 4.5 RC 上运行。

更新。

我当前的代码(如上所示)将获取项目并分批对它们进行分区。每个批处理在八个线程上并行处理(i7 上的 Environment.ProcessorCount)。处理速度慢,受 CPU 限制且内存密集。 当单个批处理的处理完成时,将启动一个任务以将结果数据异步导入 RavenDb。批量导入作业本身是同步的,看起来像:

using (var session = Store.OpenSession())
{
    foreach (var data in batch)
    {
        session.Store(data);
    }
    session.SaveChanges();
}

这种方法存在一些问题:

  1. 每次完成一个批处理时,都会启动一个任务来运行导入作业。我想限制并行运行的任务数(例如最多 10 个)。此外,即使启动了许多任务,它们似乎也永远不会并行运行。

  2. 内存分配是一个大问题。处理/导入批处理后,它似乎仍保留在内存中。

我正在寻找解决上述问题的方法。理想情况下我想要:

  • 每个逻辑处理器一个线程负责处理大量数据。
  • 十个左右的并行线程等待完成的批处理导入 RavenDb。
  • 将内存分配保持在最低限度,这意味着在导入任务完成后取消分配批处理。
  • 不在其中一个线程上运行导入作业以进行批处理。已完成批处理的导入应与正在处理的下一批处理并行运行。

解决方案

var batchSize = 10000;
var bc = new BlockingCollection<List<Data>>();
var importTask = Task.Run(() =>
{
    bc.GetConsumingEnumerable()
        .AsParallel()
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .ForAll(batch =>
        {
            using (var session = Store.OpenSession())
            {
                foreach (var i in batch) session.Store(i);
                session.SaveChanges();
            }
        });
});
var processTask = Task.Run(() =>
{
    datasupplier.GetDataItems()
        .Partition(batchSize)
        .AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount)
        .ForAll(batch =>
        {
            bc.Add(batch.Select(i => new Data()
            {
                ...
            }).ToList());
        });
});

processTask.Wait();
bc.CompleteAdding();
importTask.Wait();

最佳答案

您的任务总体上听起来像是生产者-消费者工作流程。您的批处理器是生产者,而您的 RavenDB 数据“导入”是生产者输出的消费者。

考虑使用 BlockingCollection<T> 作为批处理程序和数据库导入程序之间的连接。一旦批处理器将完成的批处理推送到阻塞集合中,数据库导入器就会醒来,并在它们“ catch ”并清空集合后重新进入休眠状态。

批处理器生产者可以全速运行,并且将始终与处理先前完成的批处理的数据库导入器任务并发运行。如果您担心批处理器可能比数据库导入器领先太多(b/c 数据库导入比处理每个批处理花费的时间要长得多),您可以设置阻塞集合的上限,以便生产者在添加时阻塞超过这个限制,让消费者有机会 catch 。

不过,您的一些评论令人担忧。启动一个 Task 实例以异步执行数据库导入到批处理并没有什么特别的错误。任务!=线程。创建新的任务实例不会产生与创建新线程相同的巨大开销。

不要执着于过于精确地控制线程。即使您指定您想要的存储桶数量与您拥有的核心数量完全相同,您也不会独占使用这些核心。来自其他进程的数百个其他线程仍将在您的时间片之间进行调度。使用任务指定逻辑工作单元并让 TPL 管理线程池。避免因错误的控制感而感到沮丧。 ;>

在您的评论中,您指出您的任务似乎没有彼此异步运行(您如何确定这一点?)并且在每个批处理完成后似乎没有释放内存。我建议放弃一切,直到你能首先弄清楚这两个问题是怎么回事。您忘记在某处调用 Dispose() 了吗?您是否持有一个不必要地使整个对象树保持事件状态的引用?你测量的是正确的东西吗?并行任务是否由阻塞数据库或网络 I/O 序列化?在这两个问题得到解决之前,您的并行计划是什么并不重要。

关于c# - CPU 绑定(bind)任务的并行化继续与 IO 绑定(bind),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11676807/

相关文章:

c# - 如何获取鼠标右键事件?将 EventArgs 更改为 MouseEventArgs 会导致 Form1Designer 出错?

c# - 检测 int 中不为零的数字的方法?

c# - 确定目录中的文件数

c# - 如何在 C# 中保持线程存活

java - Camel : integration tests of asynchronous routes

c# - ASP.NET MVC3 : Validation Summary for Create Action (Server Side Validation)

c++ - 当其他线程处于事件状态并且具有 try-catch 时,无法在主线程中捕获异常

java - Quartz 作业已完成但线程仍处于阻塞状态

javascript - Node.js - 异步方法调用问题

java - 调用异步函数