masstransit - 总线连接时断开消费者连接

标签 masstransit

我们有以下用例:

我们有两条总线(内部和外部)。这个想法是我们自己的服务使用内部总线,所有第三方服务使用外部总线。我们创建了一个充当消息路由器的服务(恰本地命名为 MessageRouter)。

当消息在内部总线上发布时,MessageRouter 可以拾取该消息并将其放置在外部总线上,反之亦然。通过配置我们可以知道哪些消息可以从内部传递到外部,或者从外部传递到内部。这本身就很好用。

我们已经对消息进行了分组,我们有事件、命令和请求。每个服务都有三个 ReceiveEnpoint,每个消息类型对应一个。这意味着所有事件都在事件队列等中排队。服务“订阅”的每条消息都会获得它自己的消费者。

var queues = new Dictionary<string, IEnumerable<Type>>
{
    // ReSharper disable once PossibleMultipleEnumeration
    { "Events", consumerTypes.Where(ct => ct.GetGenericArguments().Any(ga => typeof(IEvent).IsAssignableFrom(ga))) },
    // ReSharper disable once PossibleMultipleEnumeration
    { "Commands", consumerTypes.Where(ct => ct.GetGenericArguments().Any(ga => typeof(ICommand).IsAssignableFrom(ga))) },
    // ReSharper disable once PossibleMultipleEnumeration
    { "Queries", consumerTypes.Where(ct => ct.GetGenericArguments().Any(ga => typeof(IQuery).IsAssignableFrom(ga))) }
};

foreach (var queue in queues)
{
    config.ReceiveEndpoint(GetConsumerQueueName(queue.Key), cfg =>
    {
        foreach (var consumerType in queue.Value)
        {
            cfg.Consumer(consumerType, consumerContainer.Resolve);
        }
        configurator?.Invoke((T)cfg);
    });
}

其中config是一个IBusFactoryConfigurator。此代码在服务启动时调用。

我们希望能够在 MessageRouter 中做的是“动态”添加消费者,更重要的是,从 ReceiveEndpoint 中删除消费者。

到目前为止,我们还没有任何运气。我们尝试通过在 BusControl 实例上使用 ConnectConsumer 方法来添加 Consumer。这给了我们一个 ConnectHandle ,它有一个 Disconnect 方法。然而,当使用这种方法时,我们的消费者不会接收消息。查看该句柄会发现它是一个 MultipleConnectHandle,但是该句柄没有“内部”句柄。

有没有办法使用Consumer方法来注册不同的消费者并获取他们的ConnectHandle,以便我们可以Disconnect消费者是否需要?

如上所述,理想情况下我们希望能够向 ReceiveEndpoint 动态添加和删除消费者。

最佳答案

总线启动时,您无法在接收端点上添加/删除使用者,因为总线启动时不受任何方式、形状或形式的支持。

但是,您可以将单独队列上的新接收端点与一个或多个使用者连接。在上面的例子中,您似乎没有在连接接收端点之前注册消费者,这就是为什么您在句柄集合中看不到任何内容。

关于masstransit - 总线连接时断开消费者连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49118286/

相关文章:

c# - MassTransit RabbitMq 发送消息

c# - MassTransit - 等待所有事件完成,然后继续处理

rabbitmq - Masstransit RabbitMq 自动删除仲裁队列

rabbitmq - 使用内存总线对公共(public)交通中的发布方法进行单元测试

c# - MassTransit 在 Azure 服务总线上创建队列

c# - 如何在完成 "event"时连接到 TransactionScope?

c# - 无法使用 MSMQ 让 MassTransit 在 IIS Express 上工作

c# - 如何将 MassTransit 演示分离为生产者和消费者应用程序

.net - 通过Unity的同一条消息的多个使用者在MassTransit中不起作用