C# Parallel.foreach - 使变量线程安全

标签 c# locking task-parallel-library parallel.foreach

我一直在重写一些进程密集型循环以使用 TPL 来提高速度。这是我第一次尝试线程化,所以想检查我正在做的是正确的方法。

结果很好——处理 DataTable 中 1000 行的数据将处理时间从 34 分钟减少到 9 分钟Parallel.ForEach 循环。对于此测试,我删除了非线程安全操作,例如将数据写入日志文件和递增计数器。

我仍然需要写回日志文件并递增计数器,所以我尝试实现一个锁来封装流编写器/递增代码块。

FileStream filestream = new FileStream("path_to_file.txt", FileMode.Create);
StreamWriter streamwriter = new StreamWriter(filestream);
streamwriter.AutoFlush = true;

try
{
    object locker = new object();

    // Lets assume we have a DataTable containing 1000 rows of data.
    DataTable datatable_results;

    if (datatable_results.Rows.Count > 0)
    {
        int row_counter = 0;

        Parallel.ForEach(datatable_results.AsEnumerable(), data_row =>
        { 
            // Process data_row as normal.

            // When ready to write to log, do so.
            lock (locker)
            {
                row_counter++;
                streamwriter.WriteLine("Processing row: {0}", row_counter);

                // Write any data we want to log.
            }
        });                    
    }
}
catch (Exception e)
{
    // Catch the exception.
}
streamwriter.Close();

以上似乎按预期工作,性能成本最低(仍然是 9 分钟的执行时间)。诚然,锁中包含的操作本身并不重要 - 我假设随着在锁中处理代码所花费的时间增加,线程被锁定的时间越长,它对处理时间的影响就越大。

我的问题:以上是一种有效的方法吗?或者是否有更快或更安全的不同方法来实现上述目的?

此外,假设我们的原始 DataTable 实际上包含 30000 行。将此 DataTable 拆分为每 block 1000 行,然后在 Parallel.ForEach 中处理它们,而不是一次性处理所有 300000 行,是否可以获得任何好处?

最佳答案

写入文件的开销很大,写入文件时您持有独占锁,这很糟糕。它将引入争用。

您可以将其添加到缓冲区中,然后一次写入文件。这应该会消除争用并提供扩展方式。

if (datatable_results.Rows.Count > 0)
{
    ConcurrentQueue<string> buffer = new ConcurrentQueue<string>();
    Parallel.ForEach(datatable_results.AsEnumerable(), (data_row, state, index) =>
    {
        // Process data_row as normal.

        // When ready to write to log, do so.

        buffer.Enqueue(string.Format( "Processing row: {0}", index));
    });

    streamwriter.AutoFlush = false;
    string line;
    while (buffer.TryDequeue(out line))
    {
        streamwriter.WriteLine(line);
    }
    streamwriter.Flush();//Flush once when needed
}
  1. 请注意,您不需要维护循环计数器, Parallel.ForEach 为您提供了一个。不同之处在于它不是 计数器但索引。如果我改变了预期的行为,你可以 仍然添加计数器并使用 Interlocked.Increment 来 增加它。
  2. 我看到您正在使用 streamwriter.AutoFlush = true,这会影响性能,您可以将其设置为 false 并在完成写入后将其刷新所有数据。

如果可能,将 StreamWriter 包装在 using 语句中,这样您甚至不需要刷新流(您可以免费获得)。

或者,您可以查看可以很好地完成其工作的日志记录框架。示例:NLog、Log4net 等。

关于C# Parallel.foreach - 使变量线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26975318/

相关文章:

c# - 线程安全的内存

C#锁单行(if语句)

c# - 如何使用新的任务并行库确保两个任务在不同的线程甚至不同的处理器上运行?

c# - 将 TryDequeue 放在 while 循环中是否安全?

c# - 如何使用 protobuf 将 ServiceStack 服务与非 ServiceStack 客户端集成?

c# - protobuf-net 与 Windows azure(Web 角色)兼容吗?

c# - "Chunked"内存流

c# - 显示表单会锁定单声道

c# - 如果客户端进程在事务开始后被终止,数据库表将保持锁定状态

c# - 使用超时运行 Parallel ForEach