cqrs - 如何在axon框架中处理saga发送的命令

标签 cqrs saga axon

使用 saga,给定一个事件 EventA,saga 启动,它发送一个命令(或多个)。 如何确保命令发送成功然后其他微服务中的实际逻辑没有抛出等等。

让我们看一个电子邮件传奇的例子: 当用户注册时,我们创建一个发布 UserRegisteredEvent 的用户聚合,将创建一个 saga,该 saga 负责确保注册电子邮件发送给用户(电子邮件可能包含验证 key 、欢迎消息等)。

我们应该使用:

  1. commandGateway.sendAndWait 带有 try/catch -> 它可以扩展吗?

  2. commandGateway.send 并使用截止日期并使用某种“失败事件”,例如 SendEmailFailedEvent -> 需要为命令关联一个“ token ”,以便可以将“associationProperty”与正确的传奇 发送 SendRegistrationEmailCommand

  3. commandGateway.send(...).handle(...) -> 在句柄中我们可以引用 MyEmailSaga 中的 eventGateway/commandGateway 吗? 如果出错我们发送一个事件?或者我们可以修改/调用我们拥有的 saga 实例中的方法吗?如果没有错误,则其他服务已发送“RegistrationEmailSentEvent”等事件,因此传奇将结束。

  4. 使用截止日期,因为我们只使用“发送”,不处理可能发送失败的命令的最终错误(其他服务关闭等)

  5. 还有什么吗?

  6. 或者所有的组合?

下面的错误如何处理? (使用截止日期或 .handle(...) 或其他)

错误可能是:

  • 命令没有处理程序(没有服务启动等)

  • 命令已处理,但在其他服务中引发异常并且未发送任何事件(其他服务中没有 try/catch)

  • 命令已处理,异常引发并捕获,其他服务发布事件以通知其发送电子邮件失败(saga 将接收事件并根据事件类型和提供的数据执行适当的操作 -> 可能电子邮件错误或不存在,无需重试)

  • 我错过的其他错误?

@Saga
public class MyEmailSaga {

    @Autowired
    transient CommandGateway commandGateway;


    @Autowired
    transient EventGateway eventGateway;

    @Autowired
    transient SomeService someService;

    String id;
    SomeData state;
    /** count retry times we send email so can apply logic on it */
    int sendRetryCount;

    @StartSaga
    @SagaEventHandler(associationProperty = "id")
    public void on(UserRegisteredEvent event) {
        id = event.getApplicationId();
        //state = event........
        // what are the possibilities here? 
        // Can we use sendAndWait but it does not scale very well, right?
        commandGateway.send(new SendRegistrationEmailCommand(...));
        // Is deadline good since we do not handle the "send" of the command
    }

    // Use a @DeadlineHandler to retry ?

    @DeadlineHandler(deadlineName = "retry_send_registration_email")
    fun on() {
         // resend command and re-schedule a deadline, etc
    }

    @EndSaga
    @SagaEventHandler(associationProperty = "id")
    public void on(RegistrationEmailSentEvent event) {

    }

}

编辑(接受答案后):

主要有两个选项(抱歉,下面是 kotlin 代码):

第一个选项

commandGateway.send(SendRegistrationEmailCommand(...))
    .handle({ t, result ->
    if (t != null) {
       // send event (could be caught be the same saga eventually) or send command or both
    }else{
       // send event (could be caught be the same saga eventually) or send command or both
    }
    })
// If not use handle(...) then you can use thenApply as well
    .thenApply { eventGateway.publish(SomeSuccessfulEvent(...)) }
    .thenApply { commandGateway.send(SomeSuccessfulSendOnSuccessCommand) }

第二个选项: 使用截止时间来确保 saga 在 SendRegistrationEmailCommand 失败并且您没有收到任何有关失败的事件(当您不处理发送的命令时)时执行某些操作。

当然可以将截止日期用于其他目的。

成功接收到 SendRegistrationEmailCommand 后,接收者将发布一个事件,以便 saga 收到通知并对其采取行动。 可能是 RegistrationEmailSentEvent 或 RegistrationEmailSendFailedEvent。

摘要:

似乎最好仅在命令发送失败或接收者抛出意外异常时才使用handle(),如果是这样,则发布一个事件让saga对其进行操作。 如果成功,接收者应该发布事件,saga 将监听它(并最终注册一个截止日期以防万一);接收者也可以发送事件来通知错误并且不要抛出,saga也会监听这个事件。

最佳答案

理想情况下,您可以使用异步选项来处理错误。这可以是 commandGateway.send(command)commandGateway.send(command).thenApply()。如果故障与业务逻辑相关,那么针对这些故障发出事件可能是有意义的。一个简单的gateway.send(command)就有意义了; Saga 可以对返回的事件使用react。否则,您将不得不处理命令的结果。

是否需要使用 sendAndWait 还是仅使用 send().then... 取决于失败时需要执行的事件。不幸的是,当异步处理结果时,您无法再安全地修改 Saga 的状态。轴突可能已经持续了传奇的状态,导致这些变化消失。 sendAndWait 解决了这个问题。可扩展性通常不是问题,因为不同的 Sagas 可以并行执行,具体取决于您的处理器配置。

Axon 团队目前正在寻找可能的 API,这些 API 允许在 Sagas 中安全异步执行逻辑,同时仍然保证线程安全和状态持久性。

关于cqrs - 如何在axon框架中处理saga发送的命令,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58676825/

相关文章:

java - 轴突框架 : Saga project with compensation events between two or three microservices

从域模型中读取 CQRS?

c# - 域事件处理程序什么时候开始发挥作用?

ios - React Native 应用程序在 Debug模式下工作,但在 iOS 上不工作 Release模式

domain-driven-design - 轴突框架 : send command on aggregate load

python - 我如何在没有Restful Api的情况下使用Elasticsearch

node.js - 在发布事件之前如何确保聚合存在?

state-machine - 如何使用存储引擎持久保存 Saga 实例并避免竞争条件

java - Axon 框架的真实生活体验