在遵循 Rebus 文档中的 Crm 系统 Saga 示例时,在处理事件 LegalInfoAcquiredInFirstSystem 和 LegalInfoAcquiredInSecondSystem 时,我收到了带有以下详细信息的 ConcurrencyException。
错误详情如下
消息 ID:150d3d50-1de4-4a2f-bd60-660fc441412e
交付模式:2
标题:
rbs2-内容类型:application/json;charset=utf-8
rbs2-corr-id: 0848197f-3ebb-442a-a80b-49c4c30dc0ca
rbs2-corr-seq: 2
rbs2-error-details: System.AggregateException: 1 个未处理的异常(ID 为 95395c60-cf2e-48da-89ff-fdf192ce53b9 的 saga 更新未成功,因为其他人抢先了我们)
---> Rebus.Exceptions.ConcurrencyException:ID 为 95395c60-cf2e-48da-89ff-fdf192ce53b9 的 saga 更新未成功,因为其他人抢先了我们
在Rebus.SqlServer.Sagas.SqlServerSagaStorage.Update(ISagaData sagaData,IEnumerable1correlationProperties)在Rebus.Sagas.LoadSagaDataStep.SaveSagaData(RelevantSagaInfo sagaDataToUpdate, bool 插入)在Rebus.Sagas.LoadSagaDataStep.SaveSagaData(RelevantSagaInfo sagaDataToUpdate, bool 插入) ) 在 Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func
1 next)
在 Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func
1 next)
在 Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func
1 next)
在 Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.Retry.FailFast.FailFastStep.Process(IncomingStepContext context, Func
1 next)
在Rebus.Retry.Simple.SimpleRetryStrategyStep.DispatchWithTrackerIdentifier(Func`1 next,字符串identifierToTrackMessageBy,ITransactionContext transactionContext,字符串messageId,字符串secondLevelMessageId)
--- 内部异常堆栈跟踪结束 ---
rbs2-意图: pub
rbs2-msg-id:150d3d50-1de4-4a2f-bd60-660fc441412e
rbs2-msg-类型:Crm.Messages.Events.LegalInfoAcquiredInFirstSystem、Crm.Messages.Events
rbs2-返回地址:RebusQueue
rbs2-发送者-地址:RebusQueue
rbs2-senttime: 2021-08-07T23:27:49.8601606+03:00
rbs2-source-queue:RebusQueue
内容编码:utf-8
内容类型:应用程序/json
有效载荷
100字节
编码:字符串
{“$type”:“Crm.Messages.Events.LegalInfoAcquiredInFirstSystem,Crm.Messages.Events”,“CorrId”:“70001”}
我的Rebus配置如下:
``
public void ConfigureServices(IServiceCollection services)
{
AppSettings settings = new AppSettings();
Configuration.Bind(settings);
services.AutoRegisterHandlersFromAssemblyOf<AcquireLegalInformationFromFirstSystemHandler>();
services.AddControllers();
services.AddLogging(logging => logging.AddConsole());
services.AddRebus((configure, serviceProvider) => configure
.Transport(x =>
{
x.UseRabbitMq($"amqp://{settings.Settings.UserName}:{settings.Settings.Password}@{settings.Settings.HostName}", settings.Settings.EndpointQueueName);
})
.Options(o => o.SetBusName("RebusSaga"))
.Options(o => o.SimpleRetryStrategy(errorQueueAddress: settings.Settings.ErrorQueueName, maxDeliveryAttempts: 1, secondLevelRetriesEnabled: false))
.Sagas(s =>
{
s.StoreInSqlServer(settings.ConnectionStrings.RebusContext, "Sagas", "SagaIndex", true);
})
.Timeouts(s => s.StoreInSqlServer(settings.ConnectionStrings.RebusContext, "Timeouts",true))
.Routing(r => r.TypeBased()
.MapAssemblyOf<Crm.Messages.Events.CustomerCreated>(settings.Settings.EndpointQueueName)
.MapFallback("RebusErrors"))
);
}
``
错误发生在下面的 Saga 处理程序中
``
public async Task Handle(LegalInfoAcquiredInFirstSystem first)
{
Data.GotLegalInfoFromFirstSystem = true;
await PossiblyPerformCompleteAction();
}
public async Task Handle(LegalInfoAcquiredInSecondSystem first)
{
Data.GotLegalInfoFromSecondSystem = true;
await PossiblyPerformCompleteAction();
}
async Task PossiblyPerformCompleteAction()
{
if (Data.GotLegalInfoFromFirstSystem && Data.GotLegalInfoFromSecondSystem)
{
await bus.Publish(new CustomerIsLegallyOk { CrmCustomerId = Data.CrmCustomerId });
MarkAsComplete();
}
}
``
此错误的可能来源是什么。 谢谢
最佳答案
我终于通过添加 s.EnforceExclusiveAccess(); 解决了这个问题到 Saga 选项。
关于saga - Rebus Saga 的并发异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68696165/