最近,我们将 rebus 版本从 1 升级到 5,之后我们遇到了 Saga 处理程序的问题。现在我们没有从不同的订阅者处以正确的顺序获得响应。
我们有不同的源来验证请求,因此我们有协调器来处理来自不同身份验证源的所有响应,但现在的问题是:所有“SearchStarted”消息不会首先从所有身份验证源到达协调器,因为我们无法检查有多少身份验证源开始进行身份验证。
尝试以不同的方式发送消息,例如 1. 使用SEND方法代替REPLY方法。 2. 在发送响应之前尝试使用不带await 关键字。 3.尝试了.Wait()方法和Send/Reply方法。
身份验证协调器:
public class AuthenticationSaga : Saga<AuthenticationSagaData>, IAmInitiatedBy<AuthenticationRequest>, IHandleMessages<SearchStarted>, IHandleMessages<SearchCompleted>, IHandleMessages<AuthenticationResponse>
{
private readonly IBus _bus;
public IBus Bus
{
get { return _bus; }
}
public AuthenticationSaga(IBus bus)
{
_bus = bus;
}
public async Task Handle(AuthenticationRequest message)
{
if (!IsNew) return;
Data.Id = new Guid(MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId]);
Data.ReturnAddress = MessageContext.Current.Headers[Rebus.Messages.Headers.ReturnAddress];
message.UniqueId = Data.Id.ToString();
Data.RequestMessage = message;
Bus.Publish(message);
}
public async Task Handle(SearchStarted message)
{
}
public async Task Handle(SearchCompleted message)
{
}
public async Task Handle(AuthenticationResponse message)
{
}
protected override void CorrelateMessages(ICorrelationConfig<AuthenticationSagaData> config)
{
config.Correlate<AuthenticationRequest>(m => m.UniqueId, d => d.Id);
config.Correlate<SearchStarted>(m => m.UniqueId, d => d.Id);
config.Correlate<AuthenticationResponse>(m => m.UniqueId, d => d.Id);
config.Correlate<SearchCompleted>(m => m.UniqueId, d => d.Id);
}
}
身份验证Ldap:
public class AuthenticationLdapHandler : IHandleMessages
{
private readonly IBus _bus;
public IBus bus
{
get { return _bus; }
}
public AuthenticationLdapHandler(IBus bus)
{
_bus = bus;
}
public async Task Handle(AuthenticationRequest message)
{
await bus.Reply(new SearchStarted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });
var response = AuthenticateLdap(message); await bus.Reply(response);
await bus.Reply(new SearchCompleted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });
}
}
身份验证 native :
public class AuthenticationNativeHandler : IHandleMessages
{
private readonly IBus _bus;
public IBus bus
{
get { return _bus; }
}
public AuthenticationNativeHandler(IBus bus)
{
_bus = bus;
}
public async Task Handle(AuthenticationRequest message)
{
await bus.Reply(new SearchStarted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });
var response = AuthenticateNative(message); await bus.Reply(response);
await bus.Reply(new SearchCompleted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });
}
}
我们期待 AuthenticationCoordinator 中的响应如下所示:
来自 Ldap 的 SearchStarted 消息
来自 Native 的 SearchStarted 消息
- 来自 Ldap 的 AuthenticationResponse 消息
- 来自 Ldap 的 SearchCompleted 消息
- 来自 Native 的 AuthenticationResponse 消息
- 来自 Native 的 SearchCompleted 消息
但现在我们按以下顺序收到响应:
- 来自 Ldap 的 SearchStarted 消息
- 来自 Ldap 的 AuthenticationResponse 消息
- 来自 Ldap 的 SearchCompleted 消息
- 来自 Native 的 SearchStarted 消息
- 来自 Native 的 AuthenticationResponse 消息
- 来自 Native 的 SearchCompleted 消息
我们可以设置消息的优先级吗?我们如何在画画 5 中实现高于预期的响应。
最佳答案
您所看到的很可能是 Rebus 确保在处理程序执行完成后发送所有传出消息的结果。
它通过在其事务上下文中登记所有总线操作来实现此目的,该操作仅在处理程序代码完成后才会提交。
这意味着代码如下
public async Task Handle(string message)
{
await bus.Reply("this is message 1");
await Task.Delay(TimeSpan.FromSeconds(1));
await bus.Reply("this is message 2");
await Task.Delay(TimeSpan.FromSeconds(1));
await bus.Reply("this is message 3");
}
当 Rebus 事务上下文提交时,将导致消息 1、2 和 3 同时发送,这意味着接收者将以随机顺序接收它们。
如果您的消息要立即从处理程序发送,您可以像这样“拆除”事务上下文:
var transactionContext = AmbientTransactionContext.Current;
AmbientTransactionContext.SetCurrent(null);
try
{
// current transaction will never know....
await bus.Send(whee);
}
finally
{
AmbientTransactionContext.SetCurrent(transactionContext);
}
我建议您将其包装在 IDisposable
的实现中,它可以像这样使用:
using(new RebusTransactionContextDismantler())
{
// current transaction will never know....
await bus.Publish(whee);
}
关于画画传奇 : Message is not arrived in correct sequence when subscriber(s) sent to publisher,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57575817/