c# - 如何在 MassTransit 3.0 中使用分散/聚集模式实现传奇

标签 c# masstransit saga

Jimmy Boagard 描述了一家麦当劳快餐链式店 here将其与 scatter gather pattern. 进行比较

从上述文章中窃取的工作流图像:enter image description here

初步实现思路:

为所有食品站将获得的所有类型的 FoodOrdered 事件提供一个通用接口(interface),然后每个食品站将能够消费/创建其各自的项目并发布一个通用的完成事件。例如:炸薯条和汉堡站收到有关炸薯条订单的消息,炸薯条站消费该订单会宣布 saga 正在监听的 ItemDoneEvent。

最初的担忧:

由于 Saga 不关心完成的食物类型,只关心所有食物都完成这一事实,这似乎是一个OK解决方案。然而阅读警告后here关于队列共享并注意到 Consumer.Conditional filtering has been removed with MassTransit 3.0感觉好像框架是在用这种方法说“Bad Things(TM) 将会发生”。但是我不确定如果不为厨房中的每个食品创建消息请求和响应以及关联事件,您还会怎么做。例如:FriesOrdered、BurgerOrdered FriesCooked、BurgerCooked。如果你必须为厨房里的每件元素都这样做,那会不会很乏味?

考虑到上述问题 - 这种类型的工作流程的一个好的 saga 示例应该是什么样的?

最佳答案

我遇到了类似的问题 - 需要发布几十个命令(所有相同的界面,IMyRequest)并等待所有。

实际上,我的命令启动了其他 saga,它在处理结束时发布 IMyRequestDone,而不标记 saga 已完成。 (需要在稍后的某个时间完成它们。)因此,我没有在父传奇中保存已完成的嵌套传奇的数量,而是查询子传奇实例的状态。

检查每个 MyRequestDone 消息:

Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x =>
{
    // timeout for all requests
    x.Delay = TimeSpan.FromMinutes(10);
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});


During(Active,
    When(Xxx)
        .ThenAsync(async context =>
        {
            await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
            await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));

            context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay;
            context.Instance.WaitingMyResponsesCount = 2;
        })
        .TransitionTo(WaitingMyResponses)
        .Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance))
    );

During(WaitingMyResponses,
    When(MyRequestDone)
        .Then(context =>
        {
            if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow)
                throw new TimeoutException();
        })
        .If(context =>
        {
            var db = serviceProvider.GetRequiredService<DbContext>();
            var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
            var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
                requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed
            return allDone;
        }, x => x
            .Unschedule(FailSagaOnRequestsTimeout)
            .TransitionTo(Active))
        )
        .Catch<TimeoutException>(x => x.TransitionTo(Failed))
);

During(WaitingMyResponses,
    When(FailSagaOnRequestsTimeout.Received)
        .TransitionTo(Failed)

定期检查所有请求是否已完成(通过“减少 NServiceBus Saga 负载”):

Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x =>
{
    // check interval
    x.Delay = TimeSpan.FromSeconds(15);
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});

During(Active,
    When(Xxx)
        .ThenAsync(async context =>
        {
            await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
            await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));

            context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10);
            context.Instance.WaitingMyResponsesCount = 2;
        })
        .TransitionTo(WaitingMyResponses)
        .Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))
    );

During(WaitingMyResponses,
    When(CheckAllRequestsDone.Recieved)
        .Then(context =>
        {
            var db = serviceProvider.GetRequiredService<DbContext>();
            var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
            var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
                requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing));
            if (!allDone)           
            {
                if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay)              
                    throw new TimeoutException();
                throw new NotAllDoneException();
            }
        })
        .TransitionTo(Active)
        .Catch<NotAllDoneException>(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)))
        .Catch<TimeoutException>(x => x.TransitionTo(Failed));

关于c# - 如何在 MassTransit 3.0 中使用分散/聚集模式实现传奇,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33579533/

相关文章:

c# - 使用 Entity Framework 和 MySQL 实现 MassTransit saga 持久性

asp.net-core - 具有 Redis 持久性的 MassTransit saga 给出了 Method Accpet does not have an implementation exception

java - android中通过service实现sip

azureservicebus - MassTransit - 配置 Azure 服务总线消息传递重试

c# - 执行后是否可以播放视频而不复制到客户端机器..?

java - 如何将 Axon4 中的事件重播/转换到不同的上下文中?

javascript - Redux-saga put方法不触发reducer

cqrs - 如何在axon框架中处理saga发送的命令

c# - Update-Plugin CRM 2011 上的前后实体图像

c# - 覆盖非虚拟属性