c# - 有效地等待一个或多个资源可用

标签 c# multithreading task-parallel-library multitasking

在我花太长时间重新发明轮子之前,我想检查一下 .Net 中是否已经有一个类可以满足我的要求。

我想要的是有点像 Semaphore(或者甚至可能像 CountdownEvent)但又略有不同的东西。

我有一个要求,我有不同数量的可用“资源”,并且我希望线程在可用资源为零时高效地等待。与此同时,另一个线程可以释放资源,这应该立即释放另一个等待线程。

这听起来很像信号量,但这并不是因为信号量(据我所知)在计数方面将每个线程视为“资源”。

无论如何,这是我对我想要的东西的第一个简单实现。它还没有处置、代码契约、错误处理、超时支持或取消支持,但它应该展示我想要的:

public sealed class ResourceCounter
{
    /// <summary>Create with the specified number of resources initially available.</summary>

    public ResourceCounter(int resourceCount)
    {
        _resourceCount = resourceCount;

        if (_resourceCount > 0)
        {
            _resourceAvailable.Set();
        }
    }

    /// <summary>Acquires a resource. Waits forever if necessary.</summary>

    public void Acquire()
    {
        while (true)
        {
            _resourceAvailable.Wait();

            lock (_lock)
            {
                if (_resourceCount > 0)
                {
                    if (--_resourceCount == 0)
                    {
                        _resourceAvailable.Reset();
                    }

                    return;
                }
            }
        }
    }

    /// <summary>Releases a resource.</summary>

    public void Release()
    {
        lock (_lock)
        {
            ++_resourceCount;
            _resourceAvailable.Set();
        }
    }

    private int _resourceCount;
    private readonly object _lock = new object(); 
    private readonly ManualResetEventSlim _resourceAvailable = new ManualResetEventSlim();
}

使用模式非常简单:

  1. 构造一个具有所需初始资源计数(可以是零或更多)的 ResourceCounter。

  2. 想要获取资源的线程调用 ResourceCounter.Acquire(),直到资源可用且已被获取后,该线程才会返回。

  3. 要释放资源的线程调用 ResourceCounter.Release(),它会释放资源并立即返回。

注意任何线程都可以释放资源;它不一定是获得资源的人。

我将其用作一些多线程管道代码的一部分,其中一个线程负责将工作项排队,多个线程正在处理工作项,另一个线程正在输出已处理的工作项。输出已处理工作项的线程必须对它们进行多路复用(因为处理线程可能以任何顺序输出已完成的项目),我需要一种机制来阻止工作项在多路复用器等待迟到的项目时无休止地排队。

(有关这方面的一些背景信息,请参阅 Pipelines, multiplexing, and unbounded buffering。)

无论如何,是否已经有可用的方法来执行此操作,或者我应该继续为此开发自己的类?


[编辑]

如下所述,SemaphoreSlim 做的事情完全正确。我拒绝了它,因为我认为调用 Wait() 的线程必须是调用 Release() 的线程,但事实并非如此。这就是我在周日编码时得到的...;)

最佳答案

使用队列进行通信更容易构建多级流水线架构。生产者线程将项目放入工作队列,一个或多个工作线程出队并处理项目,并将它们添加到输出队列。最后一个线程读取输出队列并输出数据。

在 .NET 中,这很容易用 BlockingCollection 完成.

参见 https://stackoverflow.com/a/5108487/56778以两级流水线为例。添加另一个阶段很简单。

为了处理输出线程乱序的问题,我使用最小堆将输出队列设为优先队列。我的项目由顺序记录号标识,因此输出线程知道接下来要输出哪个记录​​号。它将等待 AutoResetEvent 将项目放入队列(工作进程将在项目入队时设置事件)。然后输出线程会查看最上面的项目,看看它是否与预期的项目相匹配。如果没有,它将再次等待该事件。

效果很好,因为它消除了第二个队列。该 block 在它所属的输出队列中。性能非常适合我的目的。将项目入队是一个 O(log n) 操作,但实际上 n 非常小。即使队列中有 100,000 个项目,与处理一条记录所需的时间相比,将一个项目排入队列所需的时间也微不足道。

您仍然可以为此使用 BlockingCollection。您只需让一个二叉堆类实现 IProducerConsumerCollection 接口(interface)。我通过向我在 A Generic BinaryHeap class 中发布的简单二进制堆类添加锁来做到这一点.然后,您可以将其中之一提供给 BlockingCollection 构造函数,如下所示:

BlockingCollection<MyRecord> = 
    new BlockingCollection<ConcurrentBinaryHeap<MyRecord>>(
    new ConcurrentBinaryHeap<MyRecord>, MaxQueueSize);

不过,这里存在一个潜在的僵局。如果队列已满(即超过您在初始化 BlockingCollection 时设置的最大值),则迟到的线程无法将项目排入队列,所有工作都会完全停止。这在实践中从未发生在我身上,因为尽管我的每条记录处理时间各不相同,但它们并没有那么变化。

如果担心,您可以增加队列大小(只有当您可以肯定地说您永远不会填满队列时才有效),或者为下一个要发布的预期项目提供备用 channel ,如果队列已满。我做到了这一点,但就我的目的而言,增加队列大小更容易。

如果您有兴趣,我可以翻阅我的文件以找到 ConcurrentBinaryHeap 类。

关于c# - 有效地等待一个或多个资源可用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15052859/

相关文章:

c# - 没有 UI 线程的任务同步

c++ - 即使删除线程,线程数也会增加很多

c# - 如何将带有回调参数的方法包装到任务中?

c# - 如何在 lambda 表达式中组合多个语句

c# - 无法在 Mono 和 Linux 下构建 NUnit 测试项目

c++ - posix 中的内核线程

java - 是否保证 ConcurrentHashMap.get() 可以通过不同的线程看到以前的 ConcurrentHashMap.put() ?

c# - 是否建议将 prevTask.Wait() 与 ContinueWith(来自任务库)一起使用?

c# - 检查循环中的增量是否在一定范围内的最佳方法?

c# - 从源代码管理丢失后如何生成新的 .pfx 文件?