经过一周的编码和搜索论坛,似乎是时候问...
我有一个 C# 应用程序,它使用 EventingBasicConsumer 处理 RabbitMQ 发送的消息。我想同时处理多条消息,所以我在同一个连接上实例化了几个 channel (在本例中为 8 个),每个 channel 都有一个消费者。然后我将一个事件处理程序附加到每个消费者的 Received 事件。根据我目前的所有阅读,此设置应该允许事件处理程序由消费者并发触发,每个消费者都在自己的线程中运行。但在我的例子中,消费者仅在前一个消费者确认其消息后才按顺序接收消息。
有没有其他人经历过这种行为?我的理解是否正确,在这种情况下处理在技术上应该是并发的?
下面是一个基本代码,可以更好地说明问题:
Initialise() {
ConsumerChannels_ = new IModel[ConsumerCount_];
Consumers_ = new EventingBasicConsumer[ConsumerCount_];
for (int i = 0; i < ConsumerCount_; ++i)
{
ConsumerChannels_[i] = Connection_.CreateModel();
Consumers_[i] = new EventingBasicConsumer(ConsumerChannels_[i]);
Consumers_[i].Received += MessageReceived;
}
}
MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e)
{
int id = GetConsumerIndex(sender);
Log_.Debug("Consumer " + id + ": processing started...");
// do some time consuming processing here
sender.Model.BasicAck(e.DeliveryTag, false);
Log_.Debug("Consumer " + id + ": processing ended.");
}
我希望看到的是://并发处理
Consumer 1: processing started...
Consumer 2: processing started...
Consumer 3: processing started...
...
Consumer 6: processing ended.
Consumer 7: processing ended.
Consumer 8: processing ended.
但我得到的是://顺序处理
Consumer 1: processing started...
Consumer 1: processing ended.
Consumer 2: processing started...
Consumer 2: processing ended.
...
Consumer 8: processing started...
Consumer 8: processing ended.
如有任何关于如何进行的想法,我们将不胜感激。
最佳答案
您实际上可以在创建 ConnectionFactory
时设置并行处理任务的数量!
ConnectionFactory factory = new ConnectionFactory
{
ConsumerDispatchConcurrency = 2,
};
默认值为1,即serial/sequential processing。
我通过剖析 .NET client's source code 发现了这一点.这是有趣的部分(concurrency
是从 ConsumerDispatchConcurrency
设置的):
Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
{
_worker = Task.Run(loopStart);
}
else
{
var tasks = new Task[concurrency];
for (int i = 0; i < concurrency; i++)
{
tasks[i] = Task.Run(loopStart);
}
_worker = Task.WhenAll(tasks);
}
但请注意,这可能会导致竞争条件!该属性有此备注:
For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. In addition to that consumers need to be thread/concurrency safe.
关于c# - RabbitMQ 中的并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40338774/