c# - MassTransit - 多个消费者都可以收到相同的消息吗?

标签 c# .net .net-core rabbitmq masstransit

我有一个 .NET 4.5.2 服务通过 MassTransit 向 RabbitMq 发布消息。

以及使用这些消息的 .NET Core 2.1 服务的多个实例。

目前,.NET 核心消费者服务的竞争实例会从其他实例中窃取消息。

即第一个使用该消息的实例将其从队列中取出,其余服务实例将不再使用它。

我希望所有 实例使用相同的消息。

我怎样才能做到这一点?

发布者服务配置如下:

 builder.Register(context =>
            {
                MessageCorrelation.UseCorrelationId<MyWrapper>(x => x.CorrelationId);

                return Bus.Factory.CreateUsingRabbitMq(configurator =>
                {
                    configurator.Host(new Uri("rabbitmq://localhost:5671"), host =>
                    {
                        host.Username(***);
                        host.Password(***);
                    });
                    configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
                    configurator.Publish<MyWrapper>(x =>
                    {
                        x.AutoDelete = true;
                        x.Durable = true;
                        x.ExchangeType = true;
                    });

                });
            })
            .As<IBusControl>()
            .As<IBus>()
            .SingleInstance();

.NET Core 消费者服务配置如下:

        serviceCollection.AddScoped<MyWrapperConsumer>();

        serviceCollection.AddMassTransit(serviceConfigurator =>
        {
            serviceConfigurator.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://localhost:5671"), hostConfigurator =>
                {
                    hostConfigurator.Username(***);
                    hostConfigurator.Password(***);

                });
                cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
                {
                    exchangeConfigurator.AutoDelete = true;
                    exchangeConfigurator.Durable = true;
                    exchangeConfigurator.ExchangeType = "topic";
                    exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
                });
            }));
        });
        serviceCollection.AddSingleton<IHostedService, BusService>();

然后 MyWrapperConsumer 看起来像这样:

public class MyWrapperConsumer :
    IConsumer<MyWrapper>
{
    .
    .

    public MyWrapperConsumer(...) => (..) = (..);

    public async Task Consume(ConsumeContext<MyWrapper> context)
    {
        //Do Stuff 
    }
}

最佳答案

听起来您想发布消息并让多个消费者服务实例接收它们。在那种情况下,每个服务实例都需要有自己的队列。这样一来,每条已发布的消息都会将副本传送到每个队列。然后,每个接收端点将从自己的队列中读取该消息并使用它。

您所做的所有过度配置都与您想要的背道而驰。要使其工作,删除所有交换类型配置,并为每个服务实例配置一个唯一的队列名称(您可以从主机、机器等生成它),然后在消息生产者上调用 Publish。

您可以看到 RabbitMQ 拓扑是如何配置的:https://masstransit-project.com/advanced/topology/rabbitmq.html

关于c# - MassTransit - 多个消费者都可以收到相同的消息吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57209798/

相关文章:

C#.Net Windows 消息到 Delphi

c# - 弹道库

msbuild - .NET Core(非 ASP.NET Core)项目的文件可以嵌套在 VS2017 解决方案资源管理器中吗?

http - ASP Core API - 自定义未经授权的正文

c# - MVC4中两个变量的RequiredIf条件验证

c# - 将 CosmosDB const 连接字符串注入(inject) Azure Function V4 CosmosDB 输入/输出绑定(bind)?

C# MVC : Trailing equal sign in URL doesn't hit route

c# - 如何确定不同范围内两个变量的引用相等性?

c# - CancellationTokenSource 需要建议

ubuntu - .Net 核心 sdk 在 ubuntu 18.10