c# - 队列的并行处理

标签 c# multithreading thread-safety threadpool message-queue

我试图找到一种方法来在多个线程中处理队列,动态调整消费者的数量。基本上任务是众所周知的:多个生产者创建消息并将它们提交到队列中,多个消费者处理来自队列的消息。现在,我考虑了使用 System.Collections.Queue.Synchronized、System.Collections.Concurrent.ConcurrentQueue 和 System.Collections.Concurrent.BlockingCollection 等不同组件的不同方法,但我无法决定如何正确地使用最大效率,所以我很高兴通过您的输入收到一些绝妙的想法。
以下是更多详细信息:

  • 消息率在某些情况下预计会非常密集,但处理将相对简单;
  • 我不知道我应该有多少消费者;
  • 我希望进程根据排队的消息数量调整当前消费者的数量,而不是阻止他们(这意味着我想为每百条消息填充额外的消费者 f.e.,并且消费者应该如果排队消息的数量比填充它所需的数量少 50 个,则停止,例如,当消息数量超过 300 时,将填充第三个消费者,当它下降到 250 时,它应该停止)。

这就是想法。现在,我考虑将 ConcurrentQueue 包装到一个类中,该类将封装 Enqueue 方法,并在入队后检查消息数量,并决定是否启动额外的消费者。并且消费者应该在循环中进行检查,以决定是否停止它。我认为您会提出一些更有趣的解决方案。

顺便说一句,理论上我仍然不知道如何处理的情况之一是当最后一条消息正在排队时,同时最后一个消费者已经停止。另一种情况也与停止有关——如果多个消费者同时进行停止检查,他们将被停止。我应该如何处理这些情况?

为了证明我的意思,请考虑以下示例:

class MessageController
{
    private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();

    int amountOfConsumers;

    public void Enqueue(IMessage message)
    {
        messageQueue.Add(message); // point two

        if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers) // point three
        {
            Task.Factory.StartNew(() =>
            {
                IMessage msg;
                while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers)) //point one
                {
                    msg = messageQueue.Take();
                    //process msg...
                }

                ConsumerQuit(); // point four
            });

            Interlocked.Increment(ref amountOfConsumers);
        }
    }

    public void ConsumerQuit()
    {
        Interlocked.Decrement(ref amountOfConsumers);
    }
}

所以现在当我可以指出具体的代码行时,这些就是问题:

  • 当最后一个消费者发现没有消息入队时(@point one) 在它调用ConsumerQuit 方法之前,最后一个消息到达并入队,然后完成对其他消费者的检查,结果是(@第三点)仍然有一个消费者在工作,并且由于单个消息的一个消费者已经足够了 - 没有任何反应,然后最终调用了 ConsumerQuit,并且我有一条消息卡在队列中。
ConsumerTask                       | LastMessageThread
------------------------------------------------------
@point one(messageQueue.Count=0)   | @point two
no time                            | @point three(amountOfConsumers=1)
@point four                        | ended;
ended;                             | ended;
  • 当其中一个应该停止时(例如 messageQueue.Count 为 249),多个消费者同时到达“第一点”检查,其中几个将停止,因为在其中一个调用 ConsumerQuit 之前,其他几个会执行这张支票也是。
ConsumerTask1                  | ConsumerTask2| ConsumerTask3 | ConsumerTask4|
------------------------------------------------------------------------------
@point one(.Count=249;amount=4)| no time      | no time       | @point one   |
no time                        | @point one   | processing msg| @point four  |
@point four                    | no time      | @point one    | ended;       |
ended;                         | @point four  | processing msg| ended;       |
ended;                         | ended;       | ...           | ended;       |

Here, in case when the last message is already enqueued, we have one consumer task left that has to handle 249 messages alone, however the worst case can be if all them will halt, after the last message, potentialy hundreds of messages will stuck.

最佳答案

看来我终于想出了一个解决方案,但不确定性能如何。请考虑以下代码,任何反馈将不胜感激!我仍然希望看到一些其他的解决方案或想法,即使它们将完全不同并且需要在方法上进行重大改变。这是目标:“一种在多个线程中处理队列,动态调整消费者数量的方法”

class MessageController
{
    private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();

    private ManualResetEvent mre = new ManualResetEvent(true);

    private int amountOfConsumers;

    object o = new object();

    public void Enqueue(IMessage message)
    {
        messageQueue.Add(message);

        mre.WaitOne();
        if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers)
        {
            Interlocked.Increment(ref amountOfConsumers);

            var task = Task.Factory.StartNew(() =>
            {
                IMessage msg;
                bool repeat = true;

                while (repeat)
                {
                    while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers))
                    {
                        msg = messageQueue.Take();
                        //process msg...
                    }

                    lock (o)
                    {
                        mre.Reset();

                        if ((messageQueue.Count == 0) || (Math.Ceiling((double)((messageQueue.Count + 51) / 100)) < amountOfConsumers))
                        {
                            ConsumerQuit();
                            repeat = false;
                        }

                        mre.Set();
                    }
                }
            });
        }
    }

    public void ConsumerQuit()
    {
        Interlocked.Decrement(ref amountOfConsumers);
    }
}

关于c# - 队列的并行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15169429/

相关文章:

python - 一旦没有线程请求它,垃圾收集锁

java - BlockingQueue 的 drainTo() 方法的线程安全

c# - CRM 2011 Online - 插件中的无限循环错误 - 无法找到它

C# .Net Socket 保持事件时间?

python - Python 中的 All-to-All 线程通信

node.js - "bull"是否利用多核作为 node.js 运行单线程

java/swing : gui froze, 没有线程挂起

java - 在线程中使用静态变量作为不同实例之间的通信

c# - 通过LiangBarsky算法裁剪线C#

c# - 在 C# 中使用 DNS SRV 记录