java - 重新启动应用程序时事件处理程序重播?

标签 java spring-boot axon

我在本地玩 axon 服务器。我通过命令 docker run -d --name axonserver -p 8024:8024 -p 8124:8124 axoniq/axonserver 在我的本地机器上运行一个 docker 容器。

当我启动 spring-boot 应用程序时,聚合外部的事件处理程序重新运行所有以前的事件,因此我在启动时看到了这个日志语句流。

同一事件处理程序还向聚合发布命令,然后聚合将新事件附加到处理命令的聚合。因此,每次我重新启动应用程序时,我的聚合最终都会在它的末尾添加一些事件,而我只需要或期望一个事件。

flsh.axon.LetterSchedulingHandler   : Sending letter 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f...
flsh.axon.LetterSchedulingHandler   : Sending letter 6b4f6966-85ea-46e0-9c49-21bcd501a1b5...
flsh.axon.LetterSchedulingHandler   : Sending letter fc36292f-c7bd-4575-b56f-130624a87466...

flsh.axon.Letter                    : LetterScheduledEvent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SCHEDULED
flsh.axon.Letter                    : LetterScheduledEvent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SCHEDULED
flsh.axon.Letter                    : LetterScheduledEvent fc36292f-c7bd-4575-b56f-130624a87466 SCHEDULED
flsh.axon.Letter                    : Letter sent fc36292f-c7bd-4575-b56f-130624a87466 SENT
flsh.axon.Letter                    : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter                    : Letter sent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SENT
flsh.axon.Letter                    : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter                    : Letter sent 0e94035a-ec4b-4fdc-b8b6-4c0bc1927c8f SENT
flsh.axon.Letter                    : Letter sent fc36292f-c7bd-4575-b56f-130624a87466 SENT
flsh.axon.Letter                    : Letter sent 6b4f6966-85ea-46e0-9c49-21bcd501a1b5 SENT

我的事件处理程序如下所示:

@Slf4j
@Component
public class LetterSchedulingHandler {

    private final CommandGateway commandGateway;

    public LetterSchedulingHandler(CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }

    @DisallowReplay //this doesn't seem to work
    @EventHandler
    public void handle(BeginSendLetterEvent event) {
        log.info("Sending letter {}...", event.getLetterId());
        commandGateway.send(new LetterSentCommand(event.getLetterId()));
    }
}
@CommandHandler
public void handle(LetterSentCommand cmd) {
    AggregateLifecycle.apply(new LetterSentEvent(cmd.getLetterId()));
}

这些事件通过调度程序发布...通过不同的 @CommandHandler 在聚合中运行,否则会按预期成功运行。

@CommandHandler
public Letter(ScheduleLetterCommand cmd, EventScheduler scheduler) {
    String id = cmd.getLetterId();
    log.info("Received schedule command for letter id {}", id);
    ScheduleToken scheduleToken = scheduler.schedule(Duration.ofSeconds(5), new BeginSendLetterEvent(id));
    AggregateLifecycle.apply(new LetterScheduledEvent(id, scheduleToken));
}

@DisallowReplay 注释似乎无法阻止这种情况。另外,我试着关注 directions here使处理程序成为 Subscribing 事件处理器,但要么我没有做对,要么它也没有解决问题。

axon:
  axonserver:
    servers: localhost
  eventhandling:
    processors:
      LetterSchedulingHandler:
        mode: subscribing

最佳答案

您缺少的@GoldFLsh 是以下两者之一:

  1. 正确引用 LetterSchedulingHandler 背后的 EventProcessor
  2. 定义 LetterSchedulingHandler@ProcessingGroup

您可能在 Event Processors 上的引用指南中读到的内容,因为任何事件处理组件(例如阅读您的 LetterSchedulingHandler)都将与订阅或跟踪事件处理器中的其他事件处理组件分组。

然而,您在此阶段尚未配置任何处理组名称。这意味着 LetterSchedulingHandler 的处理组将默认为处理程序的包名称。因此,在您的属性文件中,您应该使用包名称而不是 LetterSchedulingHandler 将其定义为订阅。

但更清楚的是,在 LetterSchedulingHandler 上添加 @ProcessingGroup 注释,为其提供一个清晰的名称,然后您可以在属性文件中使用它。

最后,您没有正确引用事件处理器这一事实是您会看到事件重播的原因。 Axon 默认为 TrackingEventProcessor,它跟踪处理流事件的进度。如果没有持久化单元来存储这些 TrackingTokens,它将始终从头开始。

关于java - 重新启动应用程序时事件处理程序重播?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59862304/

相关文章:

java - 如何使用 Java spring 提供动态创建的文件?

spring - 如何使用注解配置 PayloadValidatingInterceptor

spring-boot - org.axonframework.eventsourcing.IncompatibleAggregateException(轴突框架 : Aggregate identifier must be non-null after applying an event)

spring - NoHandlerForCommandException 与 axon-spring-boot-starter

java - 如何管理 Spring Batch block 内较短的事务?

java - Intellij Idea Spring 404 – 未找到

java - SpringBoot注入(inject): OK with app,但没有经过测试

java - 将不同类型的事件发布到不同的队列

java - 如何在 Linux JVM 中使用 native Windows DLL

cqrs - Axon Framework与Eventuate的比较