c# - MassTransit saga 运行时预取 > 1

标签 c# masstransit saga

我有一个 MassTransit saga 状态机(源自 Automatonymous.MassTransitStateMachine),并且我正在尝试解决仅当我将端点配置 prefetchCount 设置为大于 1 的值时才会出现的问题。

问题在于“StartupCompletedEvent”已发布,然后在 saga 状态保存到数据库之前立即进行处理。

状态机配置如下:

State(() => Initialising);
State(() => StartingUp);
State(() => GeneratingFiles);

Event(() => Requested, x => x.CorrelateById(ctx => ctx.Message.CorrelationId).SelectId(ctx => ctx.Message.CorrelationId));
Event(() => StartupCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => InitialisationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => FileGenerationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));


Initially(
    When(Requested)
        .ThenAsync(async ctx => 
        {
          Console.WriteLine("Starting up...");
          await ctx.Publish(new StartupCompletedEvent() { CorrelationId = ctx.Instance.CorrelationId }));
          Console.WriteLine("Done starting up...");
        }
        .TransitionTo(StartingUp)
);


During(StartingUp,
    When(StartupCompleted)
        .ThenAsync(InitialiseSagaInstanceData)
        .TransitionTo(Initialising)
);

// snip...!

当我的传奇收到 Requested 事件时会发生什么:

  1. Initially block 的 ThenAsync 处理程序被命中。此时,没有任何 saga 数据会持久保存到存储库中(如预期)。
  2. StartupCompletedEvent 已发布到总线。这里也没有将传奇数据持久保存到存储库中。
  3. Initially 声明的 ThenAsync block 完成。至此,saga数据终于被持久化了。
  4. 没有发生其他事情。

此时,队列中没有消息,StartupCompletedEvent 丢失。但是,数据库中有一个 saga 实例。

我已经尝试过启动并确定其他线程之一(因为我的预取> 1)已获取该事件,在数据库中未找到任何具有correlationId的传奇,并丢弃了该事件。因此,在传奇有机会持久之前,事件已被发布和处理。

如果我将以下内容添加到初始处理程序中:

When(StartupCompleted)
    .Then(ctx => Console.WriteLine("Got the startup completed event when there is no saga instance"))

然后我开始执行 Console.WriteLine。我对此的理解是,该事件已被接收,但被路由到初始处理程序,因为correlationId 不存在任何传奇。如果我在此时放置一个断点并检查 saga 存储库,则还没有 saga。

可能还值得一提的其他几点:

  1. 我将 saga 存储库上下文设置为使用 IsolationLevel.Serialized
  2. 我正在使用 EntityFrameworkSagaRepository
  3. 当预取计数设置为 1 时,一切都会按预期运行
  4. 我使用 Ninject 进行 DI,并且我的 SagaRepository 是线程范围的,因此我想象预取计数允许的每个处理程序都有自己的 saga 存储库副本
  5. 如果我在一个单独的线程中发布 StartupCompletedEvent,并且之前有 1000 毫秒的 sleep 时间,那么一切都会正常工作。我认为这是因为 saga 存储库已完成保存 saga 状态,因此当事件最终发布并由处理程序拾取时,将从存储库中正确检索 saga 状态。

如果我遗漏了任何内容,请告诉我;我试图提供我认为有值(value)的一切,但又不会让这个问题变得太长......

最佳答案

我也遇到了这个问题,我想将 Chris 的评论作为答案发布,以便人们可以找到它。

解决方案是启用发件箱,以便保留消息,直到 saga 持久化为止。

c.ReceiveEndpoint("queue", e =>
{
    e.UseInMemoryOutbox();
    // other endpoint configuration here
}

关于c# - MassTransit saga 运行时预取 > 1,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38716143/

相关文章:

c# - .NET Remoting SSL 要求

c# - 如何将 List<AnonymousType#1> 转换为 List<Model>?

c# - 如何使用公共(public)交通和消息数据干净地注册类

CQRS:通过命令而不是事件启动进程管理器

c# - 如何共享远程文件夹?

c# - 获取磁盘上文件的大小

c# - MassTransit 3,其中 AzureServiceBus 在队列名称前添加斜杠

java - session 事务消费者或生产者中的消息代理异常处理

apache-kafka - 用Kafka实现Sagas