画画传奇 : Message is not arrived in correct sequence when subscriber(s) sent to publisher

标签 rebus

最近,我们将 rebus 版本从 1 升级到 5,之后我们遇到了 Saga 处理程序的问题。现在我们没有从不同的订阅者处以正确的顺序获得响应。

我们有不同的源来验证请求,因此我们有协调器来处理来自不同身份验证源的所有响应,但现在的问题是:所有“SearchStarted”消息不会首先从所有身份验证源到达协调器,因为我们无法检查有多少身份验证源开始进行身份验证。

尝试以不同的方式发送消息,例如 1. 使用SEND方法代替REPLY方法。 2. 在发送响应之前尝试使用不带await 关键字。 3.尝试了.Wait()方法和Send/Reply方法。

身份验证协调器:

public class AuthenticationSaga : Saga<AuthenticationSagaData>, IAmInitiatedBy<AuthenticationRequest>, IHandleMessages<SearchStarted>, IHandleMessages<SearchCompleted>, IHandleMessages<AuthenticationResponse>
{
    private readonly IBus _bus;

    public IBus Bus
    {
        get { return _bus; }
    }

    public AuthenticationSaga(IBus bus)
    {
        _bus = bus;
    }

    public async Task Handle(AuthenticationRequest message)
    {
        if (!IsNew) return;

        Data.Id = new Guid(MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId]);
        Data.ReturnAddress = MessageContext.Current.Headers[Rebus.Messages.Headers.ReturnAddress];
        message.UniqueId = Data.Id.ToString();
        Data.RequestMessage = message;
        Bus.Publish(message);
    }

    public async Task Handle(SearchStarted message)
    {

    }

    public async Task Handle(SearchCompleted message)
    {

    }

    public async Task Handle(AuthenticationResponse message)
    {

    }

    protected override void CorrelateMessages(ICorrelationConfig<AuthenticationSagaData> config)
    {
        config.Correlate<AuthenticationRequest>(m => m.UniqueId, d => d.Id);
        config.Correlate<SearchStarted>(m => m.UniqueId, d => d.Id);
        config.Correlate<AuthenticationResponse>(m => m.UniqueId, d => d.Id);
        config.Correlate<SearchCompleted>(m => m.UniqueId, d => d.Id);
    }
}

身份验证Ldap:

public class AuthenticationLdapHandler : IHandleMessages
{
    private readonly IBus _bus;

    public IBus bus
    {
        get { return _bus; }
    }

    public AuthenticationLdapHandler(IBus bus)
    {
        _bus = bus;
    }

    public async Task Handle(AuthenticationRequest message)
    {
        await bus.Reply(new SearchStarted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });

        var response = AuthenticateLdap(message); await bus.Reply(response);

        await bus.Reply(new SearchCompleted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });

    }
}

身份验证 native :

public class AuthenticationNativeHandler : IHandleMessages
{
    private readonly IBus _bus;

    public IBus bus
    {
        get { return _bus; }
    }

    public AuthenticationNativeHandler(IBus bus)
    {
        _bus = bus;
    }

    public async Task Handle(AuthenticationRequest message)
    {
        await bus.Reply(new SearchStarted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });

        var response = AuthenticateNative(message); await bus.Reply(response);

        await bus.Reply(new SearchCompleted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });

    }
}

我们期待 AuthenticationCoordinator 中的响应如下所示:

  1. 来自 Ldap 的 SearchStarted 消息

  2. 来自 Native 的 SearchStarted 消息

  3. 来自 Ldap 的 AuthenticationResponse 消息
  4. 来自 Ldap 的 SearchCompleted 消息
  5. 来自 Native 的 AuthenticationResponse 消息
  6. 来自 Native 的 SearchCompleted 消息

但现在我们按以下顺序收到响应:

  1. 来自 Ldap 的 SearchStarted 消息
  2. 来自 Ldap 的 AuthenticationResponse 消息
  3. 来自 Ldap 的 SearchCompleted 消息
  4. 来自 Native 的 SearchStarted 消息
  5. 来自 Native 的 AuthenticationResponse 消息
  6. 来自 Native 的 SearchCompleted 消息

我们可以设置消息的优先级吗?我们如何在画画 5 中实现高于预期的响应。

最佳答案

您所看到的很可能是 Rebus 确保在处理程序执行完成后发送所有传出消息的结果。

它通过在其事务上下文中登记所有总线操作来实现此目的,该操作仅在处理程序代码完成后才会提交。

这意味着代码如下

public async Task Handle(string message)
{
    await bus.Reply("this is message 1");

    await Task.Delay(TimeSpan.FromSeconds(1));

    await bus.Reply("this is message 2");

    await Task.Delay(TimeSpan.FromSeconds(1));

    await bus.Reply("this is message 3");
}

当 Rebus 事务上下文提交时,将导致消息 1、2 和 3 同时发送,这意味着接收者将以随机顺序接收它们。

如果您的消息要立即从处理程序发送,您可以像这样“拆除”事务上下文:

var transactionContext = AmbientTransactionContext.Current;
AmbientTransactionContext.SetCurrent(null);
try
{
    // current transaction will never know....
    await bus.Send(whee);
}
finally
{
    AmbientTransactionContext.SetCurrent(transactionContext);
}

我建议您将其包装在 IDisposable 的实现中,它可以像这样使用:

using(new RebusTransactionContextDismantler())
{
    // current transaction will never know....
    await bus.Publish(whee);
}

关于画画传奇 : Message is not arrived in correct sequence when subscriber(s) sent to publisher,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57575817/

相关文章:

msmq - 使用 rebus 和 MSMQ 取消挂起作业或执行作业的最佳策略是什么? (长时间运行)

c# - Rebus中某种消息类型的串行处理

azure - Rebus Azure ServiceBus - 源自外部服务的消息缺少 MessageID

saga - Rebus Saga 的并发异常

azure - 同一消息的多个订阅者 Rebus Azure 服务总线

c# - 一个工作单元内多个消息的相同 Rebus 处理程序实例

unit-testing - Saga 处理程序在 rebus 和相关性问题中的单元测试

rebus - 如何处理 rebus saga 中的状态转换?

Rebus Windsor 容器和内联消息处理程序

c# - 使用 Rebus 作为 ESB