我正在使用(坦率地说很棒)BlockingCollection<T>
键入大量多线程的高性能应用。
集合的吞吐量很大,在微观层面上它的性能很高。但是,对于每个“批处理”,它将始终通过标记取消 token 来结束。这会导致在任何等待中抛出异常 Take
称呼。很好,但我会选择一个返回值或输出参数来发出信号,因为 a) 异常有明显的开销 b) 在调试时,我不想手动关闭那个特定的异常中断异常。
实现似乎很紧张,理论上我想我可以反汇编并重新创建我自己的不使用异常的版本,但也许有更简单的方法?
我可以添加 null
(或者,如果不是,一个占位符)对象到集合以表示进程应该结束,但是还需要有一种很好地中止的方法,即唤醒等待线程并以某种方式告诉它们发生了什么事。
那么 - 替代集合类型?重新创建我自己的?有什么方法可以滥用这个?
(一些上下文:我选择了 BlockingCollection<T>
因为它比围绕 Queue
手动锁定有优势。据我所知,线程原语的使用非常棒,在我的例子中,这里是几毫秒 - 并且-最佳核心是使用的关键。)
编辑:我刚刚为这个打开了赏金。我不相信Anastasiosyal's answer涵盖了我在评论中提出的疑问。我知道这是一个棘手的问题。有人可以提供帮助吗?
最佳答案
正如我猜你自己已经完成的那样,查看 BlockingCollection 的反射源,不幸的是,当 CancellationToken 被传递到 BlockingCollection 并且它取消时,你将得到 OperationCancelledException,如下图所示(与图片后的几个解决方法)
GetConsumingEnumerable
在 BlockingCollection 上调用 TryTakeWithNoTimeValidation
,这又会引发此异常。
解决方法 #1
一个可能的策略是,假设您对生产者和消费者有更多的控制权,而不是将取消 token 传递给 BlockingCollection(这将引发此异常),您将取消 token 传递给生产者和消费者.
如果您的生产者没有生产并且您的消费者没有消费,那么您已经有效地取消了操作而不引发此异常并通过在您的 BlockingCollection 中传递 CancellationToken.None。
特殊情况 当 BlockingCollection 为 BoundedCapacity 或 Empty 时取消
生产者阻塞:当达到 BlockingCollection 上的 BoundedCapacity 时,生产者线程将被阻塞。因此,当尝试取消并且 BlockingCollection 处于 BoundedCapacity 时(这意味着您的消费者未被阻止但生产者被阻止,因为他们无法向队列中添加任何其他项目)那么您将需要允许使用其他项目(一个对于每个生产者线程),这将解除对生产者的阻塞(因为它们在添加到 blockingCollection 时被阻塞)并反过来允许您的取消逻辑在生产者端启动。
消费者阻塞:当您的消费者因为队列为空而被阻塞时,您可以在阻塞集合中插入一个空的工作单元(每个消费者线程一个)以解除对消费者的阻塞线程并允许您的取消逻辑进入消费者端。
当队列中有项目并且没有达到 BoundedCapacity 或 Empty 等限制时,生产者和消费者线程不应被阻塞。
解决方法 #2
使用取消工作单元。
当您的应用程序需要取消时,您的生产者(可能只有 1 个生产者就足够了,而其他生产者只是取消生产)将产生一个取消工作单元(可以是 null,正如您提到的那样,或者是某个实现标记接口(interface)的类).当消费者使用这个工作单元并检测到它实际上是一个取消工作单元时,他们的取消逻辑就会启动。要生成的取消工作单元的数量需要等于消费者线程的数量。
同样,当我们接近 BoundedCapacity 时需要谨慎,因为这可能是一些生产者被阻止的迹象。根据生产者/消费者的数量,您可以让消费者消费直到所有生产者(但 1 个)都关闭。这确保周围没有挥之不去的生产者。当只剩下 1 个生产者时,您的最后一个消费者可以关闭,生产者可以停止生产取消的工作单元。
关于c# - 使用 BlockingCollection<T> : OperationCanceledException, 有更好的方法吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8953407/