.net - TPL 数据流与普通信号量

标签 .net task-parallel-library semaphore dataflow tpl-dataflow

我需要制作可扩展的流程。该进程主要有 I/O 操作和一些次要的 CPU 操作(主要是反序列化字符串)。该过程在数据库中查询 url 列表,然后从这些 url 中获取数据,将下载的数据反序列化为对象,然后将一些数据持久化到 crm 动态中,也保存到另一个数据库中。之后,我需要更新处理 url 的第一个数据库。部分要求是使并行度可配置。

最初我想通过一系列带有 await 的任务来实现它,并使用信号量限制并行性 - 非常简单。然后我在这里阅读了@Stephen Cleary 推荐使用 TPL Dataflow 的一些帖子和答案,我认为它可能是一个很好的候选人。但是,我想通过将 Dataflow 用于有值(value)的原因来确保我“使代码复杂化”。我也得到了使用 ForEachAsync extension method 的建议这也很容易使用,但是我不确定它是否会因为它对集合进行分区的方式而导致内存开销。

对于这种情况,TPL Dataflow 是一个不错的选择吗?它如何比信号量或 ForEachAsync 方法更好 - 如果我通过 TPL DataFlow 实现它而不是其他每个选项(信号量/ForEachASync),我实际上会获得什么好处?

最佳答案

The process has mainly IO operations with some minor CPU operations (mainly deserializing strings).



这几乎只是 I/O。除非这些字符串是 巨大的 ,反序列化不值得并行化。你正在做的那种 CPU 工作将在噪音中消失。

因此,您需要关注并发异步。
  • SemaphoreSlim正如您所发现的那样,这是标准模式。
  • TPL Dataflow 也可以进行并发(异步和并行两种形式)。
  • ForEachAsync可以采取多种形式;请注意,在 blog post你引用的,有 5 此方法的不同实现,每个实现都是有效的。 “[T] 这里有许多不同的可能用于迭代的语义,每一种都会导致不同的设计选择和实现。”出于您的目的(不想要 CPU 并行化),您不应该考虑使用 Task.Run 的那些。或分区。在异步并发世界中,任何 ForEachAsync实现只是隐藏它实现的语义的语法糖,这就是为什么我倾向于避免它。

    这给您留下了 SemaphoreSlim对比 ActionBlock .我一般建议人们从 SemaphoreSlim 开始首先,如果他们的需求变得更加复杂,请考虑转向 TPL Dataflow(在某种程度上,他们似乎会从数据流管道中受益)。

    例如,“部分要求是使并行度可配置。”

    您可以从允许一定程度的并发开始 - 其中被限制的事情是单个完整操作(从 url 获取数据,将下载的数据反序列化为对象,持久化到 crm 动态和另一个数据库,并更新第一个数据库)。这就是SemaphoreSlim将是一个完美的解决方案。

    但是您可能决定要拥有多个旋钮:例如,您下载的 url 数量为一个并发度,持久化的并发度为一个,而原始数据库的更新度为一个单独的并发度。然后您还需要限制这些点之间的“队列”:内存中只有这么多反序列化对象等 - 以确保使用慢速数据库的快速网址不会导致您的应用程序出现问题内存。如果这些是有用的语义,那么您已经开始从数据流的角度来解决问题,这就是使用像 TPL Dataflow 这样的库可能会更好地为您服务的重点。

    关于.net - TPL 数据流与普通信号量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51616072/

    相关文章:

    c# - 通过SID获取本地用户

    c# - 在这种情况下,我应该对所有可能的输入进行单元测试吗?

    multithreading - 有没有办法限制任务并行库使用的线程?

    c# - Visual Studio 2010 中 Parallel.Invoke 中无法捕获的异常

    c - 解决死锁时的奇怪行为

    c# - 如何通过 Entity 框架自动为 Oracle 数据库生成标识?

    .net - .Net中ConcurrentQueue和BlockingCollection有什么区别?

    c# - 如何填充排序列表的并发字典?

    MySQL InnoDB : A long semaphore wait:

    node.js - NodeJS 回调 : tracking db calls so I can terminate the process