我有一个 .NET 4.5.2 服务通过 MassTransit 向 RabbitMq 发布消息。
以及使用这些消息的 .NET Core 2.1 服务的多个实例。
目前,.NET 核心消费者服务的竞争实例会从其他实例中窃取消息。
即第一个使用该消息的实例将其从队列中取出,其余服务实例将不再使用它。
我希望所有 实例使用相同的消息。
我怎样才能做到这一点?
发布者服务配置如下:
builder.Register(context =>
{
MessageCorrelation.UseCorrelationId<MyWrapper>(x => x.CorrelationId);
return Bus.Factory.CreateUsingRabbitMq(configurator =>
{
configurator.Host(new Uri("rabbitmq://localhost:5671"), host =>
{
host.Username(***);
host.Password(***);
});
configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
configurator.Publish<MyWrapper>(x =>
{
x.AutoDelete = true;
x.Durable = true;
x.ExchangeType = true;
});
});
})
.As<IBusControl>()
.As<IBus>()
.SingleInstance();
.NET Core 消费者服务配置如下:
serviceCollection.AddScoped<MyWrapperConsumer>();
serviceCollection.AddMassTransit(serviceConfigurator =>
{
serviceConfigurator.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost:5671"), hostConfigurator =>
{
hostConfigurator.Username(***);
hostConfigurator.Password(***);
});
cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
{
exchangeConfigurator.AutoDelete = true;
exchangeConfigurator.Durable = true;
exchangeConfigurator.ExchangeType = "topic";
exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
});
}));
});
serviceCollection.AddSingleton<IHostedService, BusService>();
然后 MyWrapperConsumer 看起来像这样:
public class MyWrapperConsumer :
IConsumer<MyWrapper>
{
.
.
public MyWrapperConsumer(...) => (..) = (..);
public async Task Consume(ConsumeContext<MyWrapper> context)
{
//Do Stuff
}
}
最佳答案
听起来您想发布消息并让多个消费者服务实例接收它们。在那种情况下,每个服务实例都需要有自己的队列。这样一来,每条已发布的消息都会将副本传送到每个队列。然后,每个接收端点将从自己的队列中读取该消息并使用它。
您所做的所有过度配置都与您想要的背道而驰。要使其工作,删除所有交换类型配置,并为每个服务实例配置一个唯一的队列名称(您可以从主机、机器等生成它),然后在消息生产者上调用 Publish。
您可以看到 RabbitMQ 拓扑是如何配置的:https://masstransit-project.com/advanced/topology/rabbitmq.html
关于c# - MassTransit - 多个消费者都可以收到相同的消息吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57209798/