c# - 如何从优先于第一个集合的任意两个 BlockingCollections 中取出一个项目?

标签 c# producer-consumer

我有两个 BlockingCollection<T> 对象,collection1collection2 .我想使用这些集合中的项目,优先考虑 collection1 中的项目.也就是说,如果两个集合都有项目,我想从 collection1 中取出项目第一的。如果他们都没有元素,我想等待元素可用。

我有以下代码:

public static T Take<T>(
    BlockingCollection<T> collection1,
    BlockingCollection<T> collection2) where T:class
{
    if (collection1.TryTake(out var item1))
    {
        return item1;
    }

    T item2;

    try
    {
        BlockingCollection<T>.TakeFromAny(
            new[] { collection1, collection2 },
            out item2);
    }
    catch (ArgumentException)
    {
        return null;
    }

    return item2;
}

此代码预计返回 null什么时候CompleteAdding在两个集合上都被调用,它们都是空的。

此代码的主要问题是 TakeFromAny 的文档方法指定 TakeFromAny会抛出 ArgumentException如果CompleteAdding被称为“集合”:

ArgumentException

The collections argument is a 0-length array or contains a null element or CompleteAdding() has been called on the collection.

如果 CompleteAdding 是否抛出被召集到任何集合?还是两个集合?

如果 CompleteAdding 会怎样被调用并且集合中还有一些项目,它会抛出吗?

我需要一种可靠的方法来做到这一点。

在这段代码中,我试图从 collection1 获取首先是因为 TakeFromAny 的文档如果两个集合都有项目,则不对从中获取项目的收集顺序提供任何保证。

这也意味着如果两个集合都是空的,然后它们稍后同时收到项目,那么我可能会从 collection2 中获得一个项目。首先,这很好。

编辑:

我将项目添加到两个集合(而不仅仅是一个集合)的原因是第一个集合没有上限,而第二个集合有。

为那些对我为什么需要这个感兴趣的人提供更多详细信息:

我在一个名为 ProceduralDataflow 的开源项目中使用它。更多详情请看这里https://github.com/ymassad/ProceduralDataflow

数据流系统中的每个处理节点都有两个集合,一个集合将包含第一次出现的项目(因此如果需要我需要减慢生产者的速度),另一个集合将包含第二次(或第三次)出现的项目,..) 次(由于数据流中的循环)。

一个集合没有上限的原因是我不希望由于数据流中的循环而出现死锁。

最佳答案

首先,简短回答您的具体问题。

Does it throw if CompleteAdding was called on any collection? or both collections?

两者(全部)- 但前提是任何集合中都没有可用元素。

What if CompleteAdding was called and the collection still has some items, does it throw?

没有。如果集合中有可用的元素,则将其从集合中移除并返回给调用者。

结论

显然文档不清楚。部分

or CompleteAdding() has been called on the collection

应该有不同的表述——像

or there is no available element in any of the collections and CompleteAdding() has been called on all the collections

基本原理

好吧,我知道依赖实现不是一个好的做法,但是当文档不清楚时,实现是我能想到的唯一可靠的官方来源。所以服用reference source , TakeFromAnyTryTakeFromAny 都调用私有(private)方法 TryTakeFromAnyCore .它以以下内容开头:

ValidateCollectionsArray(collections, false);

false 这里是一个名为 isAddOperationbool 参数,在 ValidateCollectionsArray 中使用。如下:

if (isAddOperation && collections[i].IsAddingCompleted)
{
    throw new ArgumentException(
        SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
}

对于调用 CompleteAdding() 的集合,这是可能抛出 ArgumentException 的地方之一。正如我们所见,情况并非如此(问题 #1)。

然后执行将继续以下“快速路径”:

//try the fast path first
for (int i = 0; i < collections.Length; i++)
{
    // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
    if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item))
        return i;
}

这证明了问题 #2 的答案。

最后,如果在任何集合中都没有可用元素,则实现通过调用另一个私有(private)方法采取“慢路径”TryTakeFromAnyCoreSlow ,以下评论是对已实现行为的基本解释:

//Loop until one of these conditions is met:
// 1- The operation is succeeded
// 2- The timeout expired for try* versions
// 3- The external token is cancelled, throw
// 4- The operation is TryTake and all collections are marked as completed, return false
// 5- The operation is Take and all collection are marked as completed, throw

我们两个问题的答案都在案例 #1 和案例 #5 中(注意单词 all)。顺便说一句,它还显示了 TakeFromAnyTryTakeFromAny 之间的唯一区别 - 情况 #4 与 #5,即 throwreturn -1

关于c# - 如何从优先于第一个集合的任意两个 BlockingCollections 中取出一个项目?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45947833/

相关文章:

c# - 带有路由 id 参数的 Html.BeginForm 并在提交时获取参数?

c# - EF Core 中已修改实体的自有类型属性不持久

c++ - 具有pthread和锁且无提升的单读者多作者

multithreading - 线程间通信

Java:实现多线程供应商/消费者管道,每种任务具有并行限制

java - 方法 wait() 和 notifyAll() 不是静态的

c# - 使 Enter 键像提交按钮一样

c# - Asp.Net Core 未找到 DefaultChallengeScheme

c# - 当 GC 开始运行时,我如何获得信号?

c - C 中的 Pthread_Create 导致奇怪的输出