我有两个 BlockingCollection<T>
对象,collection1
和 collection2
.我想使用这些集合中的项目,优先考虑 collection1
中的项目.也就是说,如果两个集合都有项目,我想从 collection1
中取出项目第一的。如果他们都没有元素,我想等待元素可用。
我有以下代码:
public static T Take<T>(
BlockingCollection<T> collection1,
BlockingCollection<T> collection2) where T:class
{
if (collection1.TryTake(out var item1))
{
return item1;
}
T item2;
try
{
BlockingCollection<T>.TakeFromAny(
new[] { collection1, collection2 },
out item2);
}
catch (ArgumentException)
{
return null;
}
return item2;
}
此代码预计返回 null
什么时候CompleteAdding
在两个集合上都被调用,它们都是空的。
此代码的主要问题是 TakeFromAny
的文档方法指定 TakeFromAny
会抛出 ArgumentException
如果CompleteAdding
被称为“集合”:
ArgumentException
The collections argument is a 0-length array or contains a null element or CompleteAdding() has been called on the collection.
如果 CompleteAdding
是否抛出被召集到任何集合?还是两个集合?
如果 CompleteAdding
会怎样被调用并且集合中还有一些项目,它会抛出吗?
我需要一种可靠的方法来做到这一点。
在这段代码中,我试图从 collection1
获取首先是因为 TakeFromAny
的文档如果两个集合都有项目,则不对从中获取项目的收集顺序提供任何保证。
这也意味着如果两个集合都是空的,然后它们稍后同时收到项目,那么我可能会从 collection2
中获得一个项目。首先,这很好。
编辑:
我将项目添加到两个集合(而不仅仅是一个集合)的原因是第一个集合没有上限,而第二个集合有。
为那些对我为什么需要这个感兴趣的人提供更多详细信息:
我在一个名为 ProceduralDataflow 的开源项目中使用它。更多详情请看这里https://github.com/ymassad/ProceduralDataflow
数据流系统中的每个处理节点都有两个集合,一个集合将包含第一次出现的项目(因此如果需要我需要减慢生产者的速度),另一个集合将包含第二次(或第三次)出现的项目,..) 次(由于数据流中的循环)。
一个集合没有上限的原因是我不希望由于数据流中的循环而出现死锁。
最佳答案
首先,简短回答您的具体问题。
Does it throw if
CompleteAdding
was called on any collection? or both collections?
两者(全部)- 但前提是任何集合中都没有可用元素。
What if
CompleteAdding
was called and the collection still has some items, does it throw?
没有。如果集合中有可用的元素,则将其从集合中移除并返回给调用者。
结论
显然文档不清楚。部分
or
CompleteAdding()
has been called on the collection
应该有不同的表述——像
or there is no available element in any of the collections and
CompleteAdding()
has been called on all the collections
基本原理
好吧,我知道依赖实现不是一个好的做法,但是当文档不清楚时,实现是我能想到的唯一可靠的官方来源。所以服用reference source , TakeFromAny
和 TryTakeFromAny
都调用私有(private)方法 TryTakeFromAnyCore
.它以以下内容开头:
ValidateCollectionsArray(collections, false);
false
这里是一个名为 isAddOperation
的 bool
参数,在 ValidateCollectionsArray
中使用。如下:
if (isAddOperation && collections[i].IsAddingCompleted)
{
throw new ArgumentException(
SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
}
对于调用 CompleteAdding()
的集合,这是可能抛出 ArgumentException
的地方之一。正如我们所见,情况并非如此(问题 #1)。
然后执行将继续以下“快速路径”:
//try the fast path first
for (int i = 0; i < collections.Length; i++)
{
// Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item))
return i;
}
这证明了问题 #2 的答案。
最后,如果在任何集合中都没有可用元素,则实现通过调用另一个私有(private)方法采取“慢路径”TryTakeFromAnyCoreSlow
,以下评论是对已实现行为的基本解释:
//Loop until one of these conditions is met:
// 1- The operation is succeeded
// 2- The timeout expired for try* versions
// 3- The external token is cancelled, throw
// 4- The operation is TryTake and all collections are marked as completed, return false
// 5- The operation is Take and all collection are marked as completed, throw
我们两个问题的答案都在案例 #1 和案例 #5 中(注意单词 all)。顺便说一句,它还显示了 TakeFromAny
和 TryTakeFromAny
之间的唯一区别 - 情况 #4 与 #5,即 throw
与 return -1
。
关于c# - 如何从优先于第一个集合的任意两个 BlockingCollections 中取出一个项目?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45947833/