c# - 将多线程访问的 ConcurrentBag 转储到文件的速度不够快

标签 c# task-parallel-library

我已经构建了这段代码来并行处理大量字符串之间的字符串比较,以加快速度。

我使用了 ConcurrentBag,因此所有线程(任务)都可以写入线程安全集合。然后我将这个集合转储到一个文件中。

我遇到的问题是 ConcurrentBag<string> log我转储到文件的数据比它写入文件的速度更快。所以我的程序不断消耗越来越多的内存,直到内存不足。

我的问题是我能做什么?改进写入日志 ?暂停任务直到 ConcurrentBag 被转储然后恢复任务?什么是最快的选择?

代码如下:

CsvWriter csv = new CsvWriter(@"C:\test.csv");

List<Bailleur> bailleurs = DataLoader.LoadBailleurs();
ConcurrentBag<string> log = new ConcurrentBag<string>();
int i = 0;

var taskWriteToLog = new Task(() =>
{
    // Consume the items in the bag
    string item;
    while (true)  //  (!log.IsEmpty)
    {
        if (!log.IsEmpty)
        {
            if (log.TryTake(out item))
            {
                csv.WriteLine(item);
            }
            else
                Console.WriteLine("Concurrent Bag busy");
        }
        else
        {
            System.Threading.Thread.Sleep(1000);
        }
    }
});

taskWriteToLog.Start();

Parallel.ForEach(bailleurs, s1 =>
{
    foreach (Bailleur s2 in bailleurs)
    {
        var lcs2 = LongestCommonSubsequenceExtensions.LongestCommonSubsequence(s1.Name, s2.Name);
        string line = String.Format("\"LCS\",\"{0}\",\"{1}\",\"{2}\"", s1.Name, s2.Name, lcs2.Item2);
        log.Add(line);
        // Console.WriteLine(line);

        var dic = DiceCoefficientExtensions.DiceCoefficient(s1.Name, s2.Name);
        line = String.Format("\"DICE\",\"{0}\",\"{1}\",\"{2}\"", s1.Name, s2.Name, dic);
        log.Add(line);
        // Console.WriteLine(line);
    }
    i++;
    Console.WriteLine(i);
});

public class CsvWriter
{
    public string FilePath { get; set; }
    private FileStream _fs { get; set; }
    private StreamWriter _sw { get; set; }

    public CsvWriter2(string filePath)
    {
        FilePath = filePath;
        _fs = new FileStream(FilePath, FileMode.Create, FileAccess.Write);
        _sw = new StreamWriter(_fs);
    }

    public void WriteLine(string line)
    {
        _sw.WriteLine(line);
    }
}

最佳答案

不要直接使用并发包,使用一个BlockingCollection将并发包作为后备存储(默认情况下它是并发队列)。

constructor overloads 之一允许您设置集合大小的上限,如果袋子已满,它将阻塞插入线程,直到有空间插入。

它还为您提供了 GetConsumingEnumerable()这使得从包中取出元素变得非常容易,您只需在 foreach 循环中使用它,它将一直提供您的消费者数据,直到 CompleteAdding。叫做。之后它一直运行直到包为空,然后像任何其他已完成的正常 IEnumerable 一样退出。如果包在调用 CompleteAdding 之前“变干”,它将阻塞线程并在将更多数据放入包中时自动重新启动。

void ProcessLog()
{
    CsvWriter csv = new CsvWriter(@"C:\test.csv");

    List<Bailleur> bailleurs = DataLoader.LoadBailleurs();

    const int MAX_BAG_SIZE = 500;
    BlockingCollection<string> log = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_BAG_SIZE);

    int i = 0;

    var taskWriteToLog = new Task(() =>
    {
        // Consume the items in the bag, no need for sleeps or poleing, When items are available it runs, when the bag is empty but CompletedAdding has not been called it blocks.
        foreach(string item in log.GetConsumingEnumerable())
        {
            csv.WriteLine(item);
        }
    });

    taskWriteToLog.Start();

    Parallel.ForEach(bailleurs, s1 =>
    {
        //Snip... You can switch to BlockingCollection without any changes to this section of code.
    });

    log.CompleteAdding(); //lets anyone using GetConsumingEnumerable know that no new items are comming so they can leave the foreach loops when the bag becomes empty.
}

关于c# - 将多线程访问的 ConcurrentBag 转储到文件的速度不够快,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18267414/

相关文章:

c# - 如何解析 .msstyles 文件?

c# - 字符串别名在没有 System.String 的情况下工作,但 String 不工作

c# - 带参数的任务列表

c# - 并发网络请求性能问题

c# - 为什么长时间运行的 Task 仍然会阻塞 UI?

c# - 将 "% of row"计算添加到数据透视表中

c# - 可能是GeoShape过滤器(NEST)中的错误

c# - Linq 按列表中的值分组

c# - 捕获 AggregateException

c# - 有条件地继续等待任务的一件事