使用 saga,给定一个事件 EventA,saga 启动,它发送一个命令(或多个)。 如何确保命令发送成功然后其他微服务中的实际逻辑没有抛出等等。
让我们看一个电子邮件传奇的例子: 当用户注册时,我们创建一个发布 UserRegisteredEvent 的用户聚合,将创建一个 saga,该 saga 负责确保注册电子邮件发送给用户(电子邮件可能包含验证 key 、欢迎消息等)。
我们应该使用:
commandGateway.sendAndWait
带有 try/catch -> 它可以扩展吗?commandGateway.send
并使用截止日期并使用某种“失败事件”,例如 SendEmailFailedEvent -> 需要为命令关联一个“ token ”,以便可以将“associationProperty”与正确的传奇 发送 SendRegistrationEmailCommandcommandGateway.send(...).handle(...)
-> 在句柄中我们可以引用 MyEmailSaga 中的 eventGateway/commandGateway 吗? 如果出错我们发送一个事件?或者我们可以修改/调用我们拥有的 saga 实例中的方法吗?如果没有错误,则其他服务已发送“RegistrationEmailSentEvent”等事件,因此传奇将结束。使用截止日期,因为我们只使用“发送”,不处理可能发送失败的命令的最终错误(其他服务关闭等)
还有什么吗?
或者所有的组合?
下面的错误如何处理? (使用截止日期或 .handle(...) 或其他)
错误可能是:
命令没有处理程序(没有服务启动等)
命令已处理,但在其他服务中引发异常并且未发送任何事件(其他服务中没有 try/catch)
命令已处理,异常引发并捕获,其他服务发布事件以通知其发送电子邮件失败(saga 将接收事件并根据事件类型和提供的数据执行适当的操作 -> 可能电子邮件错误或不存在,无需重试)
我错过的其他错误?
@Saga
public class MyEmailSaga {
@Autowired
transient CommandGateway commandGateway;
@Autowired
transient EventGateway eventGateway;
@Autowired
transient SomeService someService;
String id;
SomeData state;
/** count retry times we send email so can apply logic on it */
int sendRetryCount;
@StartSaga
@SagaEventHandler(associationProperty = "id")
public void on(UserRegisteredEvent event) {
id = event.getApplicationId();
//state = event........
// what are the possibilities here?
// Can we use sendAndWait but it does not scale very well, right?
commandGateway.send(new SendRegistrationEmailCommand(...));
// Is deadline good since we do not handle the "send" of the command
}
// Use a @DeadlineHandler to retry ?
@DeadlineHandler(deadlineName = "retry_send_registration_email")
fun on() {
// resend command and re-schedule a deadline, etc
}
@EndSaga
@SagaEventHandler(associationProperty = "id")
public void on(RegistrationEmailSentEvent event) {
}
}
编辑(接受答案后):
主要有两个选项(抱歉,下面是 kotlin 代码):
第一个选项
commandGateway.send(SendRegistrationEmailCommand(...))
.handle({ t, result ->
if (t != null) {
// send event (could be caught be the same saga eventually) or send command or both
}else{
// send event (could be caught be the same saga eventually) or send command or both
}
})
// If not use handle(...) then you can use thenApply as well
.thenApply { eventGateway.publish(SomeSuccessfulEvent(...)) }
.thenApply { commandGateway.send(SomeSuccessfulSendOnSuccessCommand) }
第二个选项: 使用截止时间来确保 saga 在 SendRegistrationEmailCommand 失败并且您没有收到任何有关失败的事件(当您不处理发送的命令时)时执行某些操作。
当然可以将截止日期用于其他目的。
成功接收到 SendRegistrationEmailCommand 后,接收者将发布一个事件,以便 saga 收到通知并对其采取行动。 可能是 RegistrationEmailSentEvent 或 RegistrationEmailSendFailedEvent。
摘要:
似乎最好仅在命令发送失败或接收者抛出意外异常时才使用handle()
,如果是这样,则发布一个事件让saga对其进行操作。
如果成功,接收者应该发布事件,saga 将监听它(并最终注册一个截止日期以防万一);接收者也可以发送事件来通知错误并且不要抛出,saga也会监听这个事件。
最佳答案
理想情况下,您可以使用异步选项来处理错误。这可以是 commandGateway.send(command)
或 commandGateway.send(command).thenApply()
。如果故障与业务逻辑相关,那么针对这些故障发出事件可能是有意义的。一个简单的gateway.send(command)
就有意义了; Saga 可以对返回的事件使用react。否则,您将不得不处理命令的结果。
是否需要使用 sendAndWait
还是仅使用 send().then...
取决于失败时需要执行的事件。不幸的是,当异步处理结果时,您无法再安全地修改 Saga 的状态。轴突可能已经持续了传奇的状态,导致这些变化消失。 sendAndWait
解决了这个问题。可扩展性通常不是问题,因为不同的 Sagas 可以并行执行,具体取决于您的处理器配置。
Axon 团队目前正在寻找可能的 API,这些 API 允许在 Sagas 中安全异步执行逻辑,同时仍然保证线程安全和状态持久性。
关于cqrs - 如何在axon框架中处理saga发送的命令,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58676825/