c# - 任务并行库 - 任务工厂行为

标签 c# multithreading asynchronous task-parallel-library large-files

我有大 (> 1 Gb) 文本文件。我需要以多线程方式逐行处理该文件(应用业务逻辑),所以我编写了下一个代码:

public Task Parse(Stream content, Action<Trade> parseCallback)
{    
   return Task.Factory.StartNew(() =>
   {
      using (var streamReader = new StreamReader(content))
      {
        string line;
        while ((line = streamReader.ReadLine()) != null)
        {
            if (String.IsNullOrWhiteSpace(line))
            {
                continue;
            }

            var tokens = line.Split(TokensSeparator);
            if (!tokens.Any() || tokens.Count() != 6)
            {
                continue;
            }

            Task.Factory.StartNew(() => parseCallback(new Trade
            {
                Id = Int32.Parse(tokens[0]),
                MktPrice = Decimal.Parse(tokens[1], CultureInfo.InvariantCulture),
                Notional = Decimal.Parse(tokens[2], CultureInfo.InvariantCulture),
                Quantity = Int64.Parse(tokens[3]),
                TradeDate = DateTime.Parse(tokens[4], CultureInfo.InvariantCulture),
                TradeType = tokens[5]
            }),
            TaskCreationOptions.AttachedToParent);
        }
      }
   });
}

其中 Action parseCallback 将业务逻辑应用于从数据行创建的数据对象。

Parse() 方法返回任务,调用线程等待父任务完成:

try
{
   var parseTask = parser.Parse(fileStream, AddTradeToTradeResult);
   parseTask.Wait();
}
catch (AggregateException ae)
{
   throw new ApplicationException(ae.Flatten().InnerException.Message, ae);
}

问题是:

  1. 很明显,while 循环中的任务创建速度比处理速度快。 TPL 将如何处理此类排队的任务?它们会等到线程池中的某个线程选择它们并执行,否则它们可能会丢失吗?
  2. 调用者线程 (parseTask.Wait()) 是主控制台应用程序线程。在大文件处理过程中,我能否与控制台应用程序窗口进行交互,否则它会被阻止?
  3. 我意识到所提供的方法是错误的。我怎样才能改进解决方案?例如:读取文件流并将数据放入主线程中的某个队列,在任务的帮助下处理队列项。其他一些方法?请给我方向。

最佳答案

你可以通过应用信号量来控制线程 如果需要,它将运行最多 320 个线程,然后等待完成较早的线程。

 public class Utitlity
    {
        public static SemaphoreSlim semaphore = new SemaphoreSlim(300, 320);
        public static char[] TokensSeparator = "|,".ToCharArray();
        public async Task Parse(Stream content, Action<Trade> parseCallback)
        {
            await Task.Run(async () =>
            {
                using (var streamReader = new StreamReader(content))
                {
                    string line;
                    while ((line = streamReader.ReadLine()) != null)
                    {
                        if (String.IsNullOrWhiteSpace(line))
                        {
                            continue;
                        }

                        var tokens = line.Split(TokensSeparator);
                        if (!tokens.Any() || tokens.Count() != 6)
                        {
                            continue;
                        }
                        await semaphore.WaitAsync();
                        await Task.Run(() =>
                        {
                            var trade = new Trade
                        {
                            Id = Int32.Parse(tokens[0]),
                            MktPrice = Decimal.Parse(tokens[1], CultureInfo.InvariantCulture),
                            Notional = Decimal.Parse(tokens[2], CultureInfo.InvariantCulture),
                            Quantity = Int64.Parse(tokens[3]),
                            TradeDate = DateTime.Parse(tokens[4], CultureInfo.InvariantCulture),
                            TradeType = tokens[5]
                        };
                            parseCallback(trade);

                        });
                        semaphore.Release();
                    }
                }
            });
        }
    }

    public class Trade
    {
        public int Id { get; set; }
        public decimal MktPrice { get; set; }
        public decimal Notional { get; set; }
        public long Quantity { get; set; }
        public DateTime TradeDate { get; set; }
        public string TradeType { get; set; }


    }

关于c# - 任务并行库 - 任务工厂行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31836182/

相关文章:

c# - UWP InkCanvas 用颜色填充徒手绘制的形状

c# - 如何在多个标签中打印sqldatareader的一列所有数据?

c# - Resize 和 SizeChanged 事件之间的区别

linux - 没有可用的 boost 线程?

java - 如何在 C# 中使用 WebDriver 获取指定元素的屏幕截图

multithreading - 将推力与openmp一起使用: no substantial speed up obtained

javascript - NodeJS mysql同步查询

ios - swift +异步: How to execute a callback on the same thread where it was created?

javascript - 等待 x 个 promise 完成后生成新 promise 的好模式是什么?

python - 如何在python中跨线程共享全局变量?