saga - Rebus Saga 的并发异常

标签 saga rebus

在遵循 Rebus 文档中的 Crm 系统 Saga 示例时,在处理事件 LegalInfoAcquiredInFirstSystem 和 LegalInfoAcquiredInSecondSystem 时,我收到了带有以下详细信息的 ConcurrencyException。

错误详情如下

消息 ID:150d3d50-1de4-4a2f-bd60-660fc441412e 交付模式:2 标题:
rbs2-内容类型:application/json;charset=utf-8 rbs2-corr-id: 0848197f-3ebb-442a-a80b-49c4c30dc0ca rbs2-corr-seq: 2 rbs2-error-details: System.AggregateException: 1 个未处理的异常(ID 为 95395c60-cf2e-48da-89ff-fdf192ce53b9 的 saga 更新未成功,因为其他人抢先了我们) ---> Rebus.Exceptions.ConcurrencyException:ID 为 95395c60-cf2e-48da-89ff-fdf192ce53b9 的 saga 更新未成功,因为其他人抢先了我们 在Rebus.SqlServer.Sagas.SqlServerSagaStorage.Update(ISagaData sagaData,IEnumerable1correlationProperties)在Rebus.Sagas.LoadSagaDataStep.SaveSagaData(RelevantSagaInfo sagaDataToUpdate, bool 插入)在Rebus.Sagas.LoadSagaDataStep.SaveSagaData(RelevantSagaInfo sagaDataToUpdate, bool 插入) ) 在 Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func1 next) 在 Rebus.Retry.FailFast.FailFastStep.Process(IncomingStepContext context, Func1 next) 在Rebus.Retry.Simple.SimpleRetryStrategyStep.DispatchWithTrackerIdentifier(Func`1 next,字符串identifierToTrackMessageBy,ITransactionContext transactionContext,字符串messageId,字符串secondLevelMessageId) --- 内部异常堆栈跟踪结束 --- rbs2-意图: pub rbs2-msg-id:150d3d50-1de4-4a2f-bd60-660fc441412e rbs2-msg-类型:Crm.Messages.Events.LegalInfoAcquiredInFirstSystem、Crm.Messages.Events rbs2-返回地址:RebusQueue rbs2-发送者-地址:RebusQueue rbs2-senttime: 2021-08-07T23:27:49.8601606+03:00 rbs2-source-queue:RebusQueue 内容编码:utf-8 内容类型:应用程序/json 有效载荷 100字节 编码:字符串 {“$type”:“Crm.Messages.Events.LegalInfoAcquiredInFirstSystem,Crm.Messages.Events”,“CorrId”:“70001”}

我的Rebus配置如下:

``

     public void ConfigureServices(IServiceCollection services)
     {

        AppSettings settings = new AppSettings();
        Configuration.Bind(settings);
        services.AutoRegisterHandlersFromAssemblyOf<AcquireLegalInformationFromFirstSystemHandler>();
        services.AddControllers();
        services.AddLogging(logging => logging.AddConsole());
        services.AddRebus((configure, serviceProvider) =>  configure
                .Transport(x =>
                {
                    x.UseRabbitMq($"amqp://{settings.Settings.UserName}:{settings.Settings.Password}@{settings.Settings.HostName}", settings.Settings.EndpointQueueName);

                })
                .Options(o => o.SetBusName("RebusSaga"))
                .Options(o => o.SimpleRetryStrategy(errorQueueAddress: settings.Settings.ErrorQueueName, maxDeliveryAttempts: 1, secondLevelRetriesEnabled: false))
                .Sagas(s => 
                {
                    s.StoreInSqlServer(settings.ConnectionStrings.RebusContext, "Sagas", "SagaIndex", true);
                    

                })
                .Timeouts(s => s.StoreInSqlServer(settings.ConnectionStrings.RebusContext, "Timeouts",true))
                
                .Routing(r => r.TypeBased()
                .MapAssemblyOf<Crm.Messages.Events.CustomerCreated>(settings.Settings.EndpointQueueName)
                .MapFallback("RebusErrors"))

                );

       

    }

``

错误发生在下面的 Saga 处理程序中

``

    public async Task Handle(LegalInfoAcquiredInFirstSystem first)
    {
        Data.GotLegalInfoFromFirstSystem = true;

        await PossiblyPerformCompleteAction();
    }

    public async Task Handle(LegalInfoAcquiredInSecondSystem first)
    {
        Data.GotLegalInfoFromSecondSystem = true;

        await PossiblyPerformCompleteAction();
    }

    async Task PossiblyPerformCompleteAction()
    {
        if (Data.GotLegalInfoFromFirstSystem && Data.GotLegalInfoFromSecondSystem)
        {
            await bus.Publish(new CustomerIsLegallyOk { CrmCustomerId = Data.CrmCustomerId });

            MarkAsComplete();
        }
    }

``

此错误的可能来源是什么。 谢谢

最佳答案

我终于通过添加 s.EnforceExclusiveAccess(); 解决了这个问题到 Saga 选项。

关于saga - Rebus Saga 的并发异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68696165/

相关文章:

c# - Saga 等待状态值

java - session 事务消费者或生产者中的消息代理异常处理

spring-boot - 使用 Spring Boot 进行 Saga 编排

msmq - 如何在 msmq 上使用 rebus 实现竞争消费者模式

c# - 如何调试在 rebus 处理程序中抛出的异常?

c# - Rebus:为每条消息添加用户上下文的建议

react-native - 终极版/传奇 : How to fire an action (put) inside a callback without channels (use sagas as normal generator functions)

unit-testing - Saga 处理程序在 rebus 和相关性问题中的单元测试

rebus - 一个 rebus 进程中的多个输入队列

rebus - 使用 Rebus 将错误消息从错误队列移回原始队列