java - 如何将 Axon4 中的事件重播/转换到不同的上下文中?

标签 java cqrs event-sourcing saga axon

我正在构建我的第一个事件源系统。它将有多个域,使用以发布生命周期为核心的项目。如何有效地将两个域的事件重放或重新应用到第三个域内的新聚合?

更具体一点。想象 4 个领域,每个领域都有自己的限界上下文和目的。这些上下文的简短描述:

  • 项目 - 项目是系统核心的复杂对象,几乎每个领域都需要项目数据来运行。一个项目有一个或多个 ProductTypes,其中包含有限的产品供应。
  • 媒体 - 媒体领域涵盖围绕图像、文档和生成的报告的操作,并充当文件服务器。
  • 交付 - 交付允许配置将所有出版物发布到哪些内容 channel 。
  • 发布 - 发布域处理验证项目是否可以在当前状态下发布到请求状态的复杂任务。

发布状态遵循生命周期:概念(尚未发布)> 宣布(可选)> 销售 > 售罄(发布结束)。在我的描述中,我专注于已宣布的状态。对于出版领域来说,概念实际上并不是一个薄薄的东西,因为如果出版还不知道一个项目,它总是处于概念中。


我的第一次尝试是设置一个处理传入事件 AnnouncementPublishedEvent 的普通聚合。这需要一个项目满足一些基本要求,例如“它有一个名称”、“它有一个描述”、“它至少有一个图像”等等。这意味着我需要在应用事件之前验证此信息,因此我需要以某种方式在命令中提供一个 project 实例。

在执行此操作时,我怀疑此方法破坏了 CQRS 的目的,我应该查看真正的数据源:事件。我的下一次尝试是创建一个在事件 AnnouncementPublicationRequestedEvent 时开始的 Saga。此 saga 需要审查在给定 projectId 周围发生了哪些事件,并将这些事件应用于这个新的“已发布项目”投影,以便(至少)验证请求是否可以被接受。

我对跟踪处理器进行了研究和试验,但无法找到在 Axon 版本 4 中如何完成此操作的良好示例。我还开始阅读有关 Stackoverflow 的其他几个问题,这让我觉得我可能需要重新考虑我的方法。


不幸的是,无法共享确切的代码,因为它不是开源的,即使我可以,它也远未达到工作状态。我可以使用示例代码来展示我正在尝试做什么。

@Saga
@ProcessingGroup("AnnouncementPublication")
public class AnnouncementPublicationSaga {

   private static int NUMBER_OF_ALLOWED_IMAGES

   private PublicationId publicationId;
   private ProjectId projectId;
   private int numberOfImages = 0;
   //...other fields

   @StartSaga
   @SagaEventHandler(associationProperty = "projectId")
   public void handle(AnnouncementPublicationRequestedEvent event) {
      publicationId = generatePublicationId();

      //set parameters from event for saga to use
      projectId = event.getProjectId();
      targetPublicationStatus = event.getPublicationStatus();
      date = event.getDate();

      //initialize the 'publicated project' aggregate
      //start a replay of associated events for this @ProcessingGroup
   }

   ...

   @SagaEventHandler(associationProperty = "projectId")
   public void handle(ProjectCreatedEvent event) {
      //Verify the project exists and has a valid name
   }

   ...

   /* Assumption* on how AssociationResolver works: */
   @SagaEventHandler(AssociationResolver=MediaProjectAssociator.class )
   public void handle(ProjectImageAdded event) {
      numberOfImages += 1;
   }

   /* Assumption* on how AssociationResolver works: */
   @SagaEventHandler(AssociationResolver=MediaProjectAssociator.class )
   public void handle(ProjectImageRemoved event) {
      numberOfImages -= 1;
   }

   ...

   /* In my head this should trigger if all events have been played
      up to the PublicationRequestedEvent. Or maybe 
   */
   @SagaEventHandler(associationProperty = "publicationId")
   public void handle(ValidationRequestCompleted event) {
      //ValidationResult result = ValidationResult.builder();
      ...
      if (numberOfImages > NUMBER_OF_ALLOWED_IMAGES) {
         //reason to trigger PublicationRequestDeniedEvent
         //update validationResult
      }
      ...
      if (validationResult.isAcceptable()) {
         //Trigger AnnouncementPublicationAcceptedEvent
      } else {
         //Trigger AnnouncementPublicationDeniedEvent
      }
   }

   ...

   @EndSaga
   @SagaEventHandler(associationProperty = "publicationId")
   public void handle(AnnouncementPublicationDeniedEvent event) {
      //do stuff to inform why the publication failed
   }

   @EndSaga
   @SagaEventHandler(associationProperty = "publicationId")
   public void handle(AnnouncementPublicationAcceptedEvent event){
      //do stuff to notify success to user
      //choice: delegate to delivery for actual sharing of data
      //    or  delivery itselfs listens for these events
   }
}

*associationResolver 代码是对其实际工作的假设,因为我还没有接近那部分。我的媒体上下文使用文件 ID 作为聚合标识符,因为并非每个事件都绑定(bind)到项目。但是这个传奇需要重播的所有媒体事件都将有一个 projectId 作为其中的字段。欢迎对此提出任何反馈,但这不是我现在的主要问题。


最终的结果应该是:发布记录或尝试记录以及失败原因。

出版物的记录包含与出版物相关的 projectmedia 事件的所有数据。这主要是潜在买家需要做出决定的信息。

出于这个问题的目的,我不希望上述问题得到完全解决。我只想知道我在思考事件时是否走在正确的轨道上,我重播相关事件的方法是否正确,如果是这样,如何在 Axon4 中完成。

最佳答案

根据 Martin 的问题描述,我假设您有几个不同的限界上下文。按照限界上下文的定义:

Explicitly define the context within which a model applies.

Explicitly set boundaries in terms of team organization, usage within specific parts of the application, and physical manifestations such as code bases and database schemas.

Keep the model strictly consistent within these bounds, but don’t be distracted or confused by issues outside.

由此我想强调的是,在给定的限界上下文中,您对任何组件都使用相同的语言/API。 然而,在上下文之间,您将非常有意识地共享,使用专用的上下文映射(例如反腐败层)来确保另一个域不会进入您的域。

综上所述,事件是特定限界上下文的一部分。 因此,理想情况下,使用来自其他上下文的多个事件流在另一个上下文中重新创建/重放聚合应该是不可能的。

除此之外,在 Axon 中,聚合只能根据自己发布的事件重新创建。

要仍然得出一个解决方案,即给定应用程序从其他应用程序中摄取事件以重新聚合聚合,我将采取以下步骤:

  • 拥有一个专用组件(例如反腐败层),可将传入事件转换为应用程序中不同形式的消息。
  • 如果这些事件导致聚合的重建,您需要将事件转换为命令。在谈论 CQRS 时,Axon 中的聚合基础架构组件适用于命令模型。
  • Aggregate 然后会处理命令、执行一些业务逻辑并作为结果发布一个(或多个)事件。
  • 从现在开始,框架将处理重播给定聚合的所有事件,前提是您遵循事件溯源实践来更新聚合的状态。

最后,我想指出,Axon 提供的与 TrackingEventProcessor 相关的重放的任何细节都用于 CQRS 应用程序查询端的事件处理。

希望这能为您澄清一切,马丁!如果没有,请随时在此答案下发表评论,我会相应地更新我的回复。

关于java - 如何将 Axon4 中的事件重播/转换到不同的上下文中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58136910/

相关文章:

java - 线程的一些问题

java - 将 datagramsocket 绑定(bind)到 inetaddress

java - 另一个 "Access denied for user"错误..但我可以从命令行直接登录到远程服务器

node.js - Wolkenkit 事件采购有调度程序吗?

c# - CQRS 中的验证和同步命令

java - 实现 CQRS 模式时如何使用 Spring 处理 JWT 身份验证?

Azure服务总线: How to achieve eventual consistency when a subscriber goes down

java - 打开和创建函数 OPCPackage 有什么区别

import - CQRS/ES : Bulk operations/imports

python - 在 python 中使用游戏树时存储 2048 板游戏状态的最佳方法