我有一个用例,我需要:
- 遍历 Xml 文档中的每个输入节点
- 对每个输入执行耗时的计算,以及
- 将结果写入 XML 文件。
输入看起来像这样:
<Root>
<Input>
<Case>ABC123</Case>
<State>MA</State>
<Investor>Goldman</Investor>
</Input>
<Input>
<Case>BCD234</Case>
<State>CA</State>
<Investor>Goldman</Investor>
</Input>
</Root>
和输出:
<Results>
<Output>
<Case>ABC123</Case>
<State>MA</State>
<Investor>Goldman</Investor>
<Price>75.00</Price>
<Product>Blah</Product>
</Output>
<Output>
<Case>BCD234</Case>
<State>CA</State>
<Investor>Goldman</Investor>
<Price>55.00</Price>
<Product>Ack</Product>
</Output>
</Results>
我想并行运行计算;典型的输入文件可能有 50,000 个输入节点,没有线程的总处理时间可能为 90 分钟。大约 90% 的处理时间花费在步骤 #2(计算)上。
我可以遍历 XmlReader in parallel很容易:
static IEnumerable<XElement> EnumerateAxis(XmlReader reader, string axis)
{
reader.MoveToContent();
while (reader.Read())
{
switch (reader.NodeType)
{
case XmlNodeType.Element:
if (reader.Name == axis)
{
XElement el = XElement.ReadFrom(reader) as XElement;
if (el != null)
yield return el;
}
break;
}
}
}
...
Parallel.ForEach(EnumerateAxis(reader, "Input"), node =>
{
// do calc
// lock the XmlWriter, write, unlock
});
我目前倾向于在写入 XmlWriter 时使用锁来确保线程安全。
在这种情况下,是否有更优雅的方式来处理 XmlWriter?具体来说,我是否应该让 Parallel.ForEach 代码将结果传回原始线程并让该线程处理 XmlWriter,从而避免锁定?如果是这样,我不确定正确的方法。
最佳答案
这是我最喜欢的一类问题:可以用管道解决的问题。
请注意,根据您的具体情况,这种方法实际上可能对性能产生负面影响,但正如您明确询问如何在专用线程上使用编写器一样,下面的代码恰好演示了这一点。
免责声明:理想情况下,您应该为此考虑 TPL 数据流,但这不是我精通的东西,所以我只采用熟悉的 Task
+ BlockingCollection<T>
路线。
起初我打算建议一个 3 阶段管道(读取、处理、写入),但后来我意识到您已经将前两个阶段与您“流式传输”节点的方式结合起来正在阅读并将它们提供给您的 Parallel.ForEach
(是的,您已经实现了各种管道)。更好的是 - 更少的线程同步。
考虑到这一点,代码现在变成:
public class Result
{
public string Case { get; set; }
public string State { get; set; }
public string Investor { get; set; }
public decimal Price { get; set; }
public string Product { get; set; }
}
...
using (var reader = CreateXmlReader())
{
// I highly doubt that this collection will
// ever reach its bounded capacity since
// the processing stage takes so long,
// but in case it does, Parallel.ForEach
// will be throttled.
using (var handover = new BlockingCollection<Result>(boundedCapacity: 100))
{
var processStage = Task.Run(() =>
{
try
{
Parallel.ForEach(EnumerateAxis(reader, "Input"), node =>
{
// Do calc.
Thread.Sleep(1000);
// Hand over to the writer.
// This handover is not blocking (unless our
// blocking collection has reached its bounded
// capacity, which would indicate that the
// writer is running slower than expected).
handover.Add(new Result());
});
}
finally
{
handover.CompleteAdding();
}
});
var writeStage = Task.Run(() =>
{
using (var writer = CreateXmlReader())
{
foreach (var result in handover.GetConsumingEnumerable())
{
// Write element.
}
}
});
// Note: the two stages are now running in parallel.
// You could technically use Parallel.Invoke to
// achieve the same result with a bit less code.
Task.WaitAll(processStage, writeStage);
}
}
关于C# 并行库、XmlReader、XmlWriter,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24467373/