c# - 使用 C# 读取数百万个小文件

标签 c# multithreading file-io parallel-processing task-parallel-library

我有数百万个每天生成的日志文件,我需要读取所有这些文件并将它们放在一起作为一个文件,以便在其他应用程序中对其进行一些处理。

我正在寻找执行此操作的最快方法。目前我正在使用这样的线程、任务和并行:

Parallel.For(0, files.Length, new ParallelOptions { MaxDegreeOfParallelism = 100 }, i =>
{
    ReadFiles(files[i]);
});

void ReadFiles(string file)
{
    try
    {
        var txt = File.ReadAllText(file);
        filesTxt.Add(tmp);
    }
    catch { }
    GlobalCls.ThreadNo--;
}

foreach (var file in files)
{
    //Int64 index = i;
    //var file = files[index];
    while (Process.GetCurrentProcess().Threads.Count > 100)
    { 
        Thread.Sleep(100);
        Application.DoEvents();
    }
    new Thread(() => ReadFiles(file)).Start();
    GlobalCls.ThreadNo++;
    // Task.Run(() => ReadFiles(file));      
}

问题是读取几千个文件后,读取越来越慢!!

知道为什么吗?读取数百万个小文件的最快方法是什么?谢谢。

最佳答案

看起来您正在加载内存中所有文件的内容,然后再将它们写回单个文件。这可以解释为什么这个过程会随着时间变慢。

优化流程的一种方法是将读取部分与写入部分分开,并并行进行。这称为生产者消费者模式。它可以使用 Parallel 类、线程或任务来实现,但我将演示基于强大的 TPL Dataflow library 的实现。 ,特别适合这样的工作。

private static async Task MergeFiles(IEnumerable<string> sourceFilePaths,
    string targetFilePath, CancellationToken cancellationToken = default,
    IProgress<int> progress = null)
{
    var readerBlock = new TransformBlock<string, string>(async filePath =>
    {
        return File.ReadAllText(filePath); // Read the small file
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 2, // Reading is parallelizable
        BoundedCapacity = 100, // No more than 100 file-paths buffered
        CancellationToken = cancellationToken, // Cancel at any time
    });

    StreamWriter streamWriter = null;

    int filesProcessed = 0;
    var writerBlock = new ActionBlock<string>(text =>
    {
        streamWriter.Write(text); // Append to the target file
        filesProcessed++;
        if (filesProcessed % 10 == 0) progress?.Report(filesProcessed);
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1, // We can't parallelize the writer
        BoundedCapacity = 100, // No more than 100 file-contents buffered
        CancellationToken = cancellationToken, // Cancel at any time
    });

    readerBlock.LinkTo(writerBlock,
        new DataflowLinkOptions() { PropagateCompletion = true });

    // This is a tricky part. We use BoundedCapacity, so we must propagate manually
    // a possible failure of the writer to the reader, otherwise a deadlock may occur.
    PropagateFailure(writerBlock, readerBlock);

    // Open the output stream
    using (streamWriter = new StreamWriter(targetFilePath))
    {
        // Feed the reader with the file paths
        foreach (var filePath in sourceFilePaths)
        {
            var accepted = await readerBlock.SendAsync(filePath,
                cancellationToken); // Cancel at any time
            if (!accepted) break; // This will happen if the reader fails
        }
        readerBlock.Complete();
        await writerBlock.Completion;
    }

    async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
    {
        try { await block1.Completion.ConfigureAwait(false); }
        catch (Exception ex)
        {
            if (block1.Completion.IsCanceled) return; // On cancellation do nothing
            block2.Fault(ex);
        }
    }
}

使用示例:

var cts = new CancellationTokenSource();
var progress = new Progress<int>(value =>
{
    // Safe to update the UI
    Console.WriteLine($"Files processed: {value:#,0}");
});
var sourceFilePaths = Directory.EnumerateFiles(@"C:\SourceFolder", "*.log",
    SearchOption.AllDirectories); // Include subdirectories
await MergeFiles(sourceFilePaths, @"C:\AllLogs.log", cts.Token, progress);

BoundedCapacity用于控制内存使用。

如果磁盘驱动器是 SSD,您可以尝试使用 MaxDegreeOfParallelism 读取大于 2。

为了获得最佳性能,您可以考虑写入与包含源文件的驱动器不同的磁盘驱动器。

TPL 数据流库可用 a package适用于 .NET Framework,并且内置于 .NET Core。

关于c# - 使用 C# 读取数百万个小文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58151529/

相关文章:

c# - 隐藏表单上的最小化和最大化按钮而不隐藏标题

c++ - OpenMP 代码在线程池中执行

java - 如何释放 Activity 完成的线程?

java - 如何处理不完整的文件?获取异常

c# - 将多个模型的一个实例映射到 EF Core 中的一个属性

c# - SQL Server 匹配单词短语和顺序相关性的最佳方法

java - 对目录中的所有文件执行操作

c - 来自不同进程的两个文件描述符指向打开文件表中的相同条目

c# - 带有不同线条颜色的 ListBox ScrollBar

multithreading - App Engine Channel API 的线程安全/原子性