c# - RabbitMQ 中的并发

标签 c# multithreading concurrency rabbitmq

经过一周的编码和搜索论坛,似乎是时候问...

我有一个 C# 应用程序,它使用 EventingBasicConsumer 处理 RabbitMQ 发送的消息。我想同时处理多条消息,所以我在同一个连接上实例化了几个 channel (在本例中为 8 个),每个 channel 都有一个消费者。然后我将一个事件处理程序附加到每个消费者的 Received 事件。根据我目前的所有阅读,此设置应该允许事件处理程序由消费者并发触发,每个消费者都在自己的线程中运行。但在我的例子中,消费者仅在前一个消费者确认其消息后才按顺序接收消息。

有没有其他人经历过这种行为?我的理解是否正确,在这种情况下处理在技术上应该是并发的?

下面是一个基本代码,可以更好地说明问题:

Initialise() {
    ConsumerChannels_ = new IModel[ConsumerCount_];
    Consumers_ = new EventingBasicConsumer[ConsumerCount_];
    for (int i = 0; i < ConsumerCount_; ++i)
    {
         ConsumerChannels_[i] = Connection_.CreateModel();
         Consumers_[i] = new EventingBasicConsumer(ConsumerChannels_[i]);
         Consumers_[i].Received += MessageReceived;
    }
}

MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e)
{
    int id = GetConsumerIndex(sender);
    Log_.Debug("Consumer " + id + ": processing started...");         
    // do some time consuming processing here
    sender.Model.BasicAck(e.DeliveryTag, false);
    Log_.Debug("Consumer " + id + ": processing ended.");
}

我希望看到的是://并发处理

Consumer 1: processing started...

Consumer 2: processing started...

Consumer 3: processing started...

...

Consumer 6: processing ended.

Consumer 7: processing ended.

Consumer 8: processing ended.

但我得到的是://顺序处理

Consumer 1: processing started...

Consumer 1: processing ended.

Consumer 2: processing started...

Consumer 2: processing ended.

...

Consumer 8: processing started...

Consumer 8: processing ended.

如有任何关于如何进行的想法,我们将不胜感激。

最佳答案

您实际上可以在创建 ConnectionFactory 时设置并行处理任务的数量!

ConnectionFactory factory = new ConnectionFactory
{
    ConsumerDispatchConcurrency = 2,
};

默认值为1,即serial/sequential processing。

我通过剖析 .NET client's source code 发现了这一点.这是有趣的部分(concurrency 是从 ConsumerDispatchConcurrency 设置的):

Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
{
    _worker = Task.Run(loopStart);
}
else
{
    var tasks = new Task[concurrency];
    for (int i = 0; i < concurrency; i++)
    {
        tasks[i] = Task.Run(loopStart);
    }
    _worker = Task.WhenAll(tasks);
}

但请注意,这可能会导致竞争条件!该属性有此备注:

For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. In addition to that consumers need to be thread/concurrency safe.

关于c# - RabbitMQ 中的并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40338774/

相关文章:

c# - 如何在 MVC4 网站 : do I really need to expose my dev environment? 中集成和测试 PayPal

Java-重用具有相同运行但不同参数的线程

c# - 尝试设置本地计算机帐户密码时出现未知错误 (0x80005000)

c# - 按钮样式在运行时有效,但 Visual Studio 在设计时显示错误

c# - 在 Monodroid 上与 GoogleMaps 叠加

java - 在多线程 Web 应用程序中访问请求范围的 beans

用于解决性能问题的 Java 分析器

java - Java 的 java.util.concurrent 包的 .NET 等价物是什么?

java - 如何在@GuardedBy注释中使用多个互斥体?

rust - 当接收 channel 需要包装在互斥体中以从多个 Therad 读取时,多生产者多消费者 channel 的意义何在?