state-machine - 如何使用存储引擎持久保存 Saga 实例并避免竞争条件

标签 state-machine masstransit saga automatonymous

我尝试使用RedisSagaRepository持久保存Saga实例;我想在负载平衡设置中运行 Saga,因此无法使用 InMemorySagaRepository。 然而,在我切换之后,我注意到消费者发布的一些事件没有被 Saga 处理。我检查了队列,没有看到任何消息。

我注意到,当消费者几乎没有时间处理命令和发布事件时,很可能会发生这种情况。 如果我使用 InMemorySagaRepository 或在 Consumer.Consume()

中添加 Task.Delay() ,则不会出现此问题

我使用的方式不对吗?

此外,如果我想在负载平衡设置中运行 Saga,并且 Saga 需要使用字典发送多个相同类型的命令来跟踪完整性(与 Handling transition to state for multiple events 中的逻辑类似)。当多个 Consumer 同时发布事件时,如果两个 Sagas 同时处理两个不同的事件,我是否会出现竞争条件?在这种情况下,State 对象中的 Dictionary 会被正确设置吗?

代码可用 here

SagaService.ConfigureSagaEndPoint() 是我在 InMemorySagaRepositoryRedisSagaRepository 之间切换的地方

private void ConfigureSagaEndPoint(IRabbitMqReceiveEndpointConfigurator endpointConfigurator)
{
    var stateMachine = new MySagaStateMachine();

    try

    {
        var redisConnectionString = "192.168.99.100:6379";
        var redis = ConnectionMultiplexer.Connect(redisConnectionString);

        ///If we switch to RedisSagaRepository and Consumer publish its response too quick,
        ///It seems like the consumer published event reached Saga instance before the state is updated
        ///When it happened, Saga will not process the response event because it is not in the "Processing" state
        //var repository = new RedisSagaRepository<SagaState>(() => redis.GetDatabase());
        var repository = new InMemorySagaRepository<SagaState>();

        endpointConfigurator.StateMachineSaga(stateMachine, repository);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
}

LeafConsumer.Consume 是我们添加 Task.Delay() 的地方

public class LeafConsumer : IConsumer<IConsumerRequest>
{
    public async Task Consume(ConsumeContext<IConsumerRequest> context)
    {
        ///If MySaga project is using RedisSagaRepository, uncomment await Task.Delay() below
        ///Otherwise, it seems that the Publish message from Consumer will not be processed
        ///If using InMemorySagaRepository, code will work without needing Task.Delay
        ///Maybe I am doing something wrong here with these projects
        ///Or in real life, we probably have code in Consumer that will take a few milliseconds to complete
        ///However, we cannot predict latency between Saga and Redis
        //await Task.Delay(1000);

        Console.WriteLine($"Consuming CorrelationId = {context.Message.CorrelationId}");
        await context.Publish<IConsumerProcessed>(new
        {
            context.Message.CorrelationId,
        });
    }
}

最佳答案

当您以这种方式发布事件,并且使用具有非事务性 saga 存储库(例如 Redis)的多个服务实例时,您需要设计您的 saga,以便 Redis 使用和强制执行唯一标识符。这可以防止创建同一传奇的多个实例。

您还需要接受超出“预期”状态的事件。例如,期望在仅在处理中接收另一个事件之前接收一个 Start,这会将 saga 置于处理状态,很可能会失败。建议允许由任何事件序列启动传奇(最初以自动匿名方式),以避免无序消息传递问题。只要事件全部将转盘从左向右移动,就会达到最终的状态。如果在较晚的事件之后接收到较早的事件,则它不应向后移动状态(或在本例中向左移动),而仅向 saga 实例添加信息并将其保留在较晚的状态。

如果在不同的服务实例上处理两个事件,它们都会尝试将 saga 实例插入 Redis,这将因重复而失败。然后,该消息应重试(将 UseMessageRetry() 添加到您的接收端点),然后该消息将拾取现有的 saga 实例并应用该事件。

关于state-machine - 如何使用存储引擎持久保存 Saga 实例并避免竞争条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57709865/

相关文章:

.net - MassTransit Azure ServiceBus 强制连接使用 HTTPS 而不是 AMQP

ios - React Native 应用程序在 Debug模式下工作,但在 iOS 上不工作 Release模式

nservicebus - 有条件地启动 Saga

wpf - 实现 UI 状态机的最佳方式是什么?

java - 如何正确设计具有内部条件转换的游戏状态机

c# - MassTransit - 消费者抛出异常/发布者等待响应超时

azure - 使用 MassTransit 与 Azure 服务总线和 MSI 身份验证时出现 RBAC 问题

events - 这个进程管理器状态应该被持久化吗?

uml - BoUML 的状态机生成器是否正确处理复合状态的退出和进入?

c - 在 C 中实现有限状态机