rabbitmq - MassTransit 丢失消息 - Rabbit MQ - 当发布者和消费者端点名称相同时,

标签 rabbitmq masstransit

如果您使用相同的端点名称创建发布者和使用者,我们遇到了 MassTransit 丢失消息的情况。

注意下面的代码;如果我为消费者或发布者使用不同的端点名称(例如,发布者为“rabbitmq://localhost/mtlossPublised”),则该消息将同时计算已发布和已消费匹配;如果我使用相同的端点名称(如示例中所示),那么我消耗的消息比发布的消息少。

这是预期的行为吗?或者我做错了什么,下面的工作示例代码。

using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MTMessageLoss
{
    class Program
    {
        static void Main(string[] args)
        {
            var consumerBus = ServiceBusFactory.New(b =>
            {
                b.UseRabbitMq();
                b.UseRabbitMqRouting();
                b.ReceiveFrom("rabbitmq://localhost/mtloss");
            });
            var publisherBus = ServiceBusFactory.New(b =>
            {
                b.UseRabbitMq();
                b.UseRabbitMqRouting();
                b.ReceiveFrom("rabbitmq://localhost/mtloss");
            });
            consumerBus.SubscribeConsumer(() => new MessageConsumer());
            for (int i = 0; i < 10; i++)
                publisherBus.Publish(new SimpleMessage() { CorrelationId = Guid.NewGuid(), Message = string.Format("This is message {0}", i) });
            Console.WriteLine("Press ENTER Key to see how many you consumed");
            Console.ReadLine();
            Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.", MessageConsumer.Count);
            Console.ReadLine();
            consumerBus.Dispose();
            publisherBus.Dispose();
        }
    }
    public interface ISimpleMessage : CorrelatedBy<Guid>
    {
        string Message { get; }
    }
    public class SimpleMessage : ISimpleMessage
    {
        public Guid CorrelationId { get; set; }
        public string Message { get; set; }
    }
    public class MessageConsumer : Consumes<ISimpleMessage>.All
    {
        public static int Count = 0;
        public void Consume(ISimpleMessage message)
        {
            System.Threading.Interlocked.Increment(ref Count);
        }
    }
}

最佳答案

最重要的是,总线的每个实例都需要它自己的队列来读取。即使总线只是为了发布消息而存在。这只是 MassTransit 工作方式的要求。

http://masstransit.readthedocs.org/en/master/configuration/config_api.html#basic-options - 看到警告。

当两个总线实例共享同一个队列时,我们将行为保留为未定义。无论如何,这不是我们支持的条件。每个总线实例可能会向其他总线实例发送元数据,并且需要它自己的端点。这对 MSMQ 来说是一个更大的交易,所以也许我们可以让这个案例在 RabbitMQ 上工作——但这​​不是我们在这一点上花太多时间考虑的事情。

关于rabbitmq - MassTransit 丢失消息 - Rabbit MQ - 当发布者和消费者端点名称相同时,,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12458175/

相关文章:

php - Laravel Queue Worker、RabbitMQ 和远程生成的运行作业

node.js - 如何使用 Node.js 和分布在多个监听器之间的 API 构建消息传递总线?

c# - 如何编写 MassTransitStateMachine 单元测试?

c# - 公共(public)交通过滤器在消费时收到消息

c# - 当客户端花费超过 60 秒来确认消息时,Rabbitmq 服务器断开连接

java - Spring AMQP动态创建RabbitTemplate和SimpleMessageListenerContainer,报错RabbitTemplate is not配置为MessageListener

c# - 抽象出 MassTransit 依赖

c# - MassTransit:是否可以非通用地注册处理程序?

c# - MassTransit 3 与 Azure 服务总线创建队列

java - Spring RabbitTemplate - 如何在发送时自动创建队列