C# 并行库、XmlReader、XmlWriter

标签 c# multithreading parallel-processing xmlwriter

我有一个用例,我需要:

  • 遍历 Xml 文档中的每个输入节点
  • 对每个输入执行耗时的计算,以及
  • 将结果写入 XML 文件。

输入看起来像这样:

<Root>
  <Input>
    <Case>ABC123</Case>
    <State>MA</State>
    <Investor>Goldman</Investor>
  </Input>
  <Input>
    <Case>BCD234</Case>
    <State>CA</State>
    <Investor>Goldman</Investor>
  </Input>
</Root>

和输出:

<Results>
  <Output>
    <Case>ABC123</Case>
    <State>MA</State>
    <Investor>Goldman</Investor>
    <Price>75.00</Price>
    <Product>Blah</Product>
  </Output>
  <Output>
    <Case>BCD234</Case>
    <State>CA</State>
    <Investor>Goldman</Investor>
    <Price>55.00</Price>
    <Product>Ack</Product>
  </Output>
</Results>

我想并行运行计算;典型的输入文件可能有 50,000 个输入节点,没有线程的总处理时间可能为 90 分钟。大约 90% 的处理时间花费在步骤 #2(计算)上。

我可以遍历 XmlReader in parallel很容易:

static IEnumerable<XElement> EnumerateAxis(XmlReader reader, string axis)
{
  reader.MoveToContent();
  while (reader.Read())
  {
    switch (reader.NodeType)
    {
      case XmlNodeType.Element:
        if (reader.Name == axis)
        {
          XElement el = XElement.ReadFrom(reader) as XElement;
          if (el != null)
            yield return el;
        }
        break;
    }
  }
}
...
Parallel.ForEach(EnumerateAxis(reader, "Input"), node =>
{ 
  // do calc
  // lock the XmlWriter, write, unlock
});

我目前倾向于在写入 XmlWriter 时使用锁来确保线程安全。

在这种情况下,是否有更优雅的方式来处理 XmlWriter?具体来说,我是否应该让 Parallel.ForEach 代码将结果传回原始线程并让该线程处理 XmlWriter,从而避免锁定?如果是这样,我不确定正确的方法。

最佳答案

这是我最喜欢的一类问题:可以用管道解决的问题。

请注意,根据您的具体情况,这种方法实际上可能性能产生负面影响,但正如您明确询问如何在专用线程上使用编写器一样,下面的代码恰好演示了这一点。

免责声明:理想情况下,您应该为此考虑 TPL 数据流,但这不是我精通的东西,所以我只采用熟悉的 Task + BlockingCollection<T>路线。

起初我打算建议一个 3 阶段管道(读取、处理、写入),但后来我意识到您已经将前两个阶段与您“流式传输”节点的方式结合起来正在阅读并将它们提供给您的 Parallel.ForEach (是的,您已经实现了各种管道)。更好的是 - 更少的线程同步。

考虑到这一点,代码现在变成:

public class Result
{
    public string Case { get; set; }
    public string State { get; set; }
    public string Investor { get; set; }
    public decimal Price { get; set; }
    public string Product { get; set; }
}

...

using (var reader = CreateXmlReader())
{
    // I highly doubt that this collection will
    // ever reach its bounded capacity since
    // the processing stage takes so long,
    // but in case it does, Parallel.ForEach
    // will be throttled.
    using (var handover = new BlockingCollection<Result>(boundedCapacity: 100))
    {
        var processStage = Task.Run(() =>
        {
            try
            {
                Parallel.ForEach(EnumerateAxis(reader, "Input"), node =>
                {
                    // Do calc.
                    Thread.Sleep(1000);

                    // Hand over to the writer.
                    // This handover is not blocking (unless our 
                    // blocking collection has reached its bounded
                    // capacity, which would indicate that the
                    // writer is running slower than expected).
                    handover.Add(new Result());
                });
            }
            finally
            {
                handover.CompleteAdding();
            }
        });

        var writeStage = Task.Run(() =>
        {
            using (var writer = CreateXmlReader())
            {
                foreach (var result in handover.GetConsumingEnumerable())
                {
                    // Write element.
                }
            }
        });

        // Note: the two stages are now running in parallel.
        // You could technically use Parallel.Invoke to
        // achieve the same result with a bit less code.
        Task.WaitAll(processStage, writeStage);
    }
}

关于C# 并行库、XmlReader、XmlWriter,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24467373/

相关文章:

c# - ABCpdf nuget 包 XULRunner 文件夹已损坏?

c# - 使用 MahApp.Metro 主题自定义 SaveFileDialog

Linux:多核 CPU 中的进程和线程

Java程序无限循环,没有任何错误信息

c++ - 更改 lpthread 中的处理器数量时出现段错误(核心转储)

c# - 无法加载文件或程序集“System.Management.Automation,版本 = 3.0.0.0

c# - 使用 C# 连接到 MySql 数据库所需的参数

java - Java 中并发友好链表中的节点特定锁定

c++ - 将生成的进程的输出捕获到字符串

azure - 在 Azure 中同步并行操作