我有大 (> 1 Gb) 文本文件。我需要以多线程方式逐行处理该文件(应用业务逻辑),所以我编写了下一个代码:
public Task Parse(Stream content, Action<Trade> parseCallback)
{
return Task.Factory.StartNew(() =>
{
using (var streamReader = new StreamReader(content))
{
string line;
while ((line = streamReader.ReadLine()) != null)
{
if (String.IsNullOrWhiteSpace(line))
{
continue;
}
var tokens = line.Split(TokensSeparator);
if (!tokens.Any() || tokens.Count() != 6)
{
continue;
}
Task.Factory.StartNew(() => parseCallback(new Trade
{
Id = Int32.Parse(tokens[0]),
MktPrice = Decimal.Parse(tokens[1], CultureInfo.InvariantCulture),
Notional = Decimal.Parse(tokens[2], CultureInfo.InvariantCulture),
Quantity = Int64.Parse(tokens[3]),
TradeDate = DateTime.Parse(tokens[4], CultureInfo.InvariantCulture),
TradeType = tokens[5]
}),
TaskCreationOptions.AttachedToParent);
}
}
});
}
其中 Action parseCallback 将业务逻辑应用于从数据行创建的数据对象。
Parse() 方法返回任务,调用线程等待父任务完成:
try
{
var parseTask = parser.Parse(fileStream, AddTradeToTradeResult);
parseTask.Wait();
}
catch (AggregateException ae)
{
throw new ApplicationException(ae.Flatten().InnerException.Message, ae);
}
问题是:
- 很明显,while 循环中的任务创建速度比处理速度快。 TPL 将如何处理此类排队的任务?它们会等到线程池中的某个线程选择它们并执行,否则它们可能会丢失吗?
- 调用者线程 (parseTask.Wait()) 是主控制台应用程序线程。在大文件处理过程中,我能否与控制台应用程序窗口进行交互,否则它会被阻止?
- 我意识到所提供的方法是错误的。我怎样才能改进解决方案?例如:读取文件流并将数据放入主线程中的某个队列,在任务的帮助下处理队列项。其他一些方法?请给我方向。
最佳答案
你可以通过应用信号量来控制线程 如果需要,它将运行最多 320 个线程,然后等待完成较早的线程。
public class Utitlity
{
public static SemaphoreSlim semaphore = new SemaphoreSlim(300, 320);
public static char[] TokensSeparator = "|,".ToCharArray();
public async Task Parse(Stream content, Action<Trade> parseCallback)
{
await Task.Run(async () =>
{
using (var streamReader = new StreamReader(content))
{
string line;
while ((line = streamReader.ReadLine()) != null)
{
if (String.IsNullOrWhiteSpace(line))
{
continue;
}
var tokens = line.Split(TokensSeparator);
if (!tokens.Any() || tokens.Count() != 6)
{
continue;
}
await semaphore.WaitAsync();
await Task.Run(() =>
{
var trade = new Trade
{
Id = Int32.Parse(tokens[0]),
MktPrice = Decimal.Parse(tokens[1], CultureInfo.InvariantCulture),
Notional = Decimal.Parse(tokens[2], CultureInfo.InvariantCulture),
Quantity = Int64.Parse(tokens[3]),
TradeDate = DateTime.Parse(tokens[4], CultureInfo.InvariantCulture),
TradeType = tokens[5]
};
parseCallback(trade);
});
semaphore.Release();
}
}
});
}
}
public class Trade
{
public int Id { get; set; }
public decimal MktPrice { get; set; }
public decimal Notional { get; set; }
public long Quantity { get; set; }
public DateTime TradeDate { get; set; }
public string TradeType { get; set; }
}
关于c# - 任务并行库 - 任务工厂行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31836182/