c# - Task Parallel Library中的BlockingCollection不会自动释放底层实例的引用

标签 c# multithreading task-parallel-library

我使用 BlockingCollection 在 C# 4.0 中实现生产者-消费者模式。

BlockingCollection 包含占用大量内存的项。我想让生产者一次从 BlockingCollection 中取出一个项目,并对其进行处理。

我在想,通过在 BlockingCollection.GetConsumingEnumerable() 上使用 foreach,每次 BlockingCollection 都会从底层队列中删除项目(这意味着所有与引用),因此在处理项目的 Process() 方法结束时,可以对项目进行垃圾回收。

但这不是真的。 BlockingCollection.GetConsumingEnumerable() 上的 foreach 循环似乎确实保存了进入队列的项目的所有引用。在退出 foreach 循环之前,将保留所有项(从而防止被垃圾收集)。

我没有在 BlockingCollection.GetConsumingEnumerable() 上使用简单的 foreach 循环,而是使用 while 循环测试 BlockingCollection.IsComplete 标志,并在循环内使用 BlockingCollection.Take() 获取消耗品。我假设 BlockingCollection.Take() 具有与 List.Remove() 类似的效果,它将从 BlockingCollection 中删除项目的引用。但这又是错误的。所有项目仅在 while 循环外被垃圾收集。

所以我的问题是,我们如何才能轻松实现 BlockingCollection 可能包含内存消耗项并且每个项在被消费者使用后都可以被垃圾回收的要求?非常感谢您的帮助。

编辑:根据要求,添加了一个简单的演示代码:

// Entity is what we are going to process.
// The finalizer will tell us when Entity is going to be garbage collected.
class Entity
{
    private static int counter_;
    private int id_;
    public int ID { get{ return id_; } }
    public Entity() { id_ = counter++; }
    ~Entity() { Console.WriteLine("Destroying entity {0}.", id_); }
}

...

private BlockingCollection<Entity> jobQueue_ = new BlockingCollection<Entity>();
private List<Task> tasks_ = new List<Task>();

// This is the method to launch and wait for the tasks to finish the work.
void Run()
{
    tasks_.Add(Task.Factory.StartNew(ProduceEntity);
    Console.WriteLine("Start processing.");
    tasks_.Add(Task.Factory.StartNew(ConsumeEntity);
    Task.WaitAll(tasks_.ToArray());
}

// The producer creates Entity instances and add them to BlockingCollection.
void ProduceEntity()
{
    for(int i = 0; i < 10; i ++) // We are adding totally 10 entities.
    {
        var newEntity = new Entity();
        Console.WriteLine("Create entity {0}.", newEntity.ID);
        jobQueue_.Add(newEntity);
    }
    jobQueue_.CompleteAdding();
}

// The consumer takes entity, process it (and what I need: destroy it).
void ConsumeEntity()
{
    while(!jobQueue_.IsCompleted){
        Entity entity;
        if(jobQueue_.TryTake(entity))
        {
            Console.WriteLine("Process entity {0}.", entity.ID);
            entity = null;

            // I would assume after GC, the entity will be finalized and garbage collected, but NOT.
            GC.Collect();
            GC.WaitForPendingFinalizers();
            GC.Collect();
        }
    }
    Console.WriteLine("Finish processing.");
}

输出是所有创建和处理消息,后面是“完成处理”。然后是来自实体的所有销毁消息。创建实体消息显示 Entity.ID 从 0 到 9,销毁消息显示 Entity.ID 从 9 到 0。

编辑:

即使我设置了 BlockingCollection 的绑定(bind)容量,所有进入它的项目也只有在循环退出时才会结束,这很奇怪。

最佳答案

ConcurrentQueue 包含具有 32 个项目的内部数组的段。在段被垃圾回收之前,实体项目不会被垃圾回收。这将在从队列中取出所有 32 个项目后发生。如果您将示例更改为添加 32 个项目,您将在“完成处理”之前看到“正在销毁实体”消息。

关于c# - Task Parallel Library中的BlockingCollection不会自动释放底层实例的引用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3984582/

相关文章:

c# - 如何在 visual studio 扩展中重命名嵌套文件

c# - 检查数字数组是否连续的功能方法

java - 线程转储 : How to see the condition of waiting/or any other condition?

c# - 并行嵌套操作返回奇怪的结果

c# - 需要当前 Request.Url 的类型的 Ninject 绑定(bind)

multithreading - Spring @Controller 生命周期

java - 使用 Junit 测试 Runnable 方法,for 循环内部的逻辑意外地只执行一次

c# - Parallel.ForEach 和无法从关闭的 TextReader 异常中读取

.net - 捕获任务中的异常

c# - 如何使用 View 模型