rebus - 如何在消息处理程序中立即停止处理新消息?

标签 rebus rebus-rabbitmq

我有一个 Rebus 总线设置,只有一个工作人员,最大并行度为 1,可以“顺序”处理消息。如果处理程序失败,或出于特定业务原因,我希望总线实例立即停止处理消息。

我尝试使用 Rebus.Event 包来检测 AfterMessageHandled 处理程序中的异常并将工作人员数量设置为 0,但似乎在它真正成功停止单个工作人员实例之前处理了其他消息。

我可以在事件处理管道中的什么地方做 bus.Advanced.Workers.SetNumberOfWorkers(0); 为了防止进一步的消息处理?

我还尝试在处理程序本身的 catch block 内将工作人员数量设置为 0,但它似乎不是正确的地方,因为 SetNumberOfWorkers(0) 等待处理程序在返回之前完成并且调用者是处理程序......对我来说看起来像是某种僵局。

谢谢

最佳答案

这种特殊情况有点进退两难,因为正如您正确观察到的那样,SetNumberOfWorkers 是一个阻塞函数,它将等待直到达到所需的线程数。

在您的情况下,由于您将其设置为零,这意味着您的消息处理程序需要在线程数达到零之前完成......然后:💣☠🔒

我很抱歉这么说,因为我敢打赌你想这样做是因为你不知何故处于困境——但总的来说,我必须说,想要按顺序处理消息并按消息队列的顺序处理消息是在乞求麻烦,因为有太多事情会导致消息被重新排序。

但是,我认为你可以通过安装一个传输装饰器来解决你的问题,它会在切换时绕过真正的传输。如果装饰器随后从 Receive 方法返回 null,它将触发 Rebus 的内置退避策略并开始冷却(即它会增加轮询之间的等待时间运输)。

检查一下——首先,让我们创建一个简单的、线程安全的开关:

public class MessageHandlingToggle
{
    public volatile bool ProcessMessages = true;
}

(您可能想以某种方式结束并制作漂亮的内容,但现在应该这样做)

然后我们将在容器中将其注册为单例(假设此处为 Microsoft DI):

services.AddSingleton(new MessageHandlingToggle());

我们将使用 ProcessMessages 标志来指示是否应启用消息处理。

现在,当您配置 Rebus 时,您可以装饰传输并让装饰器访问容器中的切换实例:

services.AddRebus((configure, provider) =>
    configure
        .Transport(t => {
            t.Use(...);

            // install transport decorator here
            t.Decorate(c => {
                var transport = c.Get<ITransport>();
                var toggle = provider.GetRequiredService<MessageHandlingToggle>();
                return new MessageHandlingToggleTransportDecorator(transport, toggle);
            })
        })
        .(...)
);

所以,现在您只需要构建装饰器:

public class MessageHandlingToggleTransportDecorator : ITransport
{
    static readonly Task<TransportMessage> NoMessage = Task.FromResult(null);

    readonly ITransport _transport;
    readonly MessageHandlingToggle _toggle;

    public MessageHandlingToggleTransportDecorator(ITransport transport, MessageHandlingToggle toggle)
    {
        _transport = transport;
        _toggle = toggle;
    }   

    public string Address => _transport.Address;

    public void CreateQueue(string address) => _transport.CreateQueue(address);

    public Task Send(string destinationAddress, TransportMessage message, ITransactionContext context) 
        => _transport.Send(destinationAddress, message, context);

    public Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
        => _toggle.ProcessMessages 
            ? _transport.Receive(context, cancellationToken)
            : NoMessage;
}

如您所见,当 ProcessMessages == false 时,它只会返回 null。唯一剩下的就是决定何时再次恢复处理消息,以某种方式从容器中拉出 MessageHandlingToggle(可能是通过注入(inject)),然后将 bool 弹回到

我希望对你有用,或者至少给你一些关于如何解决你的问题的灵感。 🙂

关于rebus - 如何在消息处理程序中立即停止处理新消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69621988/

相关文章:

Rebus RabbitMQ IHandleMessage 不工作

c# - 通知消息的所有消费者

c# - Rebus 失败的消息会发生什么情况

c# - 一个工作单元内多个消息的相同 Rebus 处理程序实例

nservicebus - 我应该针对这种情况发布或发送消息吗?

rebus - 使用 T-SQL 将 Rebus 消息插入 SQL Server

c# - 将事件重新提交到 rebus