cqrs - 轴突框架 : Handle only events published by the same JVM instance?

标签 cqrs axon

您好 Axon 框架社区,

我想听听您对如何正确解决以下问题的意见。

我的 Axon 测试设置

  • 同一 Spring Boot 应用程序的两个实例(使用 axon-spring-boot-starter 4.4,不带 Axon Server)
  • 每个实例定期发布相同的事件
  • 两个实例都连接到同一 EventSource(使用 JpaEventStorageEngine 的单个 SQL Server 实例)
  • 每个实例都配置为使用 TrackingEventProcessors
  • 每个实例都注册了相同的事件处理程序

我想要实现的目标

我希望一个实例发布的事件仅由同一个实例处理

If instance1 publishes eventX then only instance1 should handle eventX

到目前为止我已经尝试过

  • 我可以使用 SubscribingEventProcessor 实现上述场景。不幸的是,在我的例子中,这不是一个选项,因为我们希望可以选择重播事件以重建/添加新的查询模型。
  • 我可以将每个实例的事件处理程序分配给不同的处理组。不幸的是这没有奏效。也许是因为每个 TrackingEventProcessors 实例都处理相同的 EventStream ? - 不过对此不太确定。
  • 我可以实现一个 MessageHandlerInterceptor,它仅在事件源来自同一实例的情况下继续。这是我到目前为止所实现的并且运行正常: MessageHandlerInterceptor
class StackEventInterceptor(private val stackProperties: StackProperties) : MessageHandlerInterceptor<EventMessage<*>> {

    override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?): Any? {
        val stackId = (unitOfWork?.message?.payload as SomeEvent).stackId
        if(stackId == stackProperties.id){
            interceptorChain?.proceed()
        }
        return null
    }
}
@Configuration
class AxonConfiguration {

    @Autowired
    fun configure(eventProcessingConfigurer: EventProcessingConfigurer, stackProperties: StackProperties) {
        val processingGroup = "processing-group-stack-${stackProperties.id}"
        eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
        eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { StackEventInterceptor(stackProperties) }
    }
}

有更好的解决方案吗?

我的印象是我当前的解决方案并不是最好的解决方案,因为理想情况下我希望只有属于某个实例的事件处理程序由 TrackingEventProcessor 实例触发。

你会如何解决这个问题?

最佳答案

你在这里遇到的有趣的场景@thowimmer。 我的第一个预感是“使用 SubscribingEventProcessor 代替”。 但是,您指出这不是您的设置中的选项。 我认为,对于处于相同情况的其他人来说,了解为什么这不是一个选择是非常有值(value)的。所以,也许你可以详细说明一下(说实话,我对此也很好奇)。

现在针对您的问题案例,确保事件仅在同一 JVM 内处理。 将起源添加到事件绝对是您可以采取的步骤,因为这允许采用逻辑方式进行过滤。 “此事件是否源自 my.origin()?”如果没有,您只需忽略该事件并完成它,就这么简单。不过,还有另一种方法可以实现这一目标,我稍后会介绍。

但是我认为过滤的地方正是您要寻找的地方。但首先,我想先明确一下为什么您需要进行过滤。正如您所注意到的,TrackingEventProcessor (TEP) 从所谓的 StreamableMessageSource 流式传输事件。 EventStore 是此类StreamableMessageSource 的实现。当您将所有事件存储在同一存储中时,它只会将所有内容流式传输到您的 TEP。由于您的事件是单个事件流的一部分,因此您需要在某个阶段对其进行过滤。使用 MessageHandlerInterceptor 可以,您甚至可以编写一个 HandlerEnhacnerDefinition ,允许您向事件处理函数添加其他行为。不管怎样,在当前的设置下,过滤需要在某个地方完成。 MessageHandlerInterceptor 可以说是执行此操作的最简单的地方。

但是,有一种不同的方法来处理这个问题。为什么不将您的事件存储分为两个应用程序的两个不同实例?显然他们不需要互相读取,那么为什么要共享同一个事件存储呢?在不了解您的域的进一步背景的情况下,我猜您实际上是在处理驻留在不同 bounded contexts 中的应用程序。 非常简而言之,与应用程序/上下文共享所有内容的兴趣为零,您只需非常有意识地相互共享领域语言的特定部分。

请注意,支持 multiple contexts ,在中间使用单个通信集线器,这正是 Axon Server 可以为您实现的目标。我并不是说你不能自己配置这个,我过去已经这样做过。但是,将这项工作留给其他人或其他人,让您无需配置基础设施,这将节省大量时间。

希望这可以帮助您了解我对此事的一些想法@thowimmer。

关于cqrs - 轴突框架 : Handle only events published by the same JVM instance?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63313532/

相关文章:

event-sourcing - Axon 中命令的授权

rabbitmq - 在消息传递中如何使用关联 ID 的实际示例?

architecture - 使用NServiceBus的多线程CQRS

c# - 查询重复聚合根属性的写入模型

java - Axon 夹具注入(inject)在 @CommandHandler 注解的方法中失败(构造函数除外)

java - 轴突框架 : Saga project with compensation events between two or three microservices

spring - NoHandlerForCommandException 与 axon-spring-boot-starter

php - 在使用 CQRS 的情况下从另一个命令调用一个命令

events - CQRS 中的事件版本控制

java - Axon - 事件处理程序拦截器配置