java - 从外部服务发送命令时,聚合中的 axon 3.4 CommandHandler 不会被触发

标签 java spring-boot jakarta-ee event-sourcing axon

所以我试图了解 axon 3.4 中的分布式命令总线。 我有一个用例,当发送某个命令时,聚合会发送一个启动传奇的事件,该传奇会发送 2 个命令,以保持发送到 2 个不同服务的数据处于一致状态。

现在棘手的部分来了,CommandHandlers 是在外部服务中定义的,这些服务执行某些操作,然后发送回命令以及其中的操作结果。但是,当命令发送时,我总是会遇到超时异常,因此 CommandBus 知道哪个聚合必须处理它,但无法将正确的聚合分配给命令。

目前 commandService.createCurrency 仅记录一条消息,这就是事件处理程序中存在 Thread.sleep 的原因,以模拟更长的运行过程。

您将在下面找到我的代码:

@Configuration
public class AxonConfig {

    @Autowired
    private Registration registration;

    private RestTemplate restTemplate = new RestTemplate();

    @Bean
    public CommandBusConnector springHttpCommandBusConnector(@Qualifier("localSegment") CommandBus localSegment,
                                                             Serializer serializer) {
        return new SpringHttpCommandBusConnector(localSegment, restTemplate, serializer);
    }

    @Bean
    public CommandRouter springCloudCommandRouter(DiscoveryClient discoveryClient) {
        return new SpringCloudCommandRouter(discoveryClient, registration, new AnnotationRoutingStrategy());
    }

    @Primary // to make sure this CommandBus implementation is used for autowiring
    @Bean
    public DistributedCommandBus springCloudDistributedCommandBus(CommandRouter commandRouter,
                                                                  CommandBusConnector commandBusConnector) {
        return new DistributedCommandBus(commandRouter, commandBusConnector);
    }

}

服务1

聚合:

@Aggregate
@Data
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CreateCurrencyAggregate {

    @AggregateIdentifier
    private String id;

    @CommandHandler
    public CreateCurrencyAggregate(CreateCurrencyCommand command) {
        log.info("starting create currency");
        Assert.notNull(command.getId(), "CreateCurrencyCommand must have an id");
        Assert.hasLength(command.getId(), "CreateCurrencyCommand id cannot be an empty String");
        this.id = command.getId();
        apply(CreateCurrencyEvent.builder()
                .id(command.getId())
                .payload(command.getPayload())
                .build());
    }

    @CommandHandler
    public void on(DalCreatedCommand command) {
        log.info("Currency created on dal layer");
        apply(DalCurrencyCreatedEvent.builder()
                .dalId(command.getId())
                .build());

    }
}

传奇:

@Slf4j
@Saga
public class CreateCurrencySaga {

    @Autowired
    private transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "id")
    public void handle(CreateCurrencyEvent event) {
        log.info("starting saga...");
        dalCreated = false;
        as400Created = true;
        SagaLifecycle.associateWith("id", event.getId());
        SagaLifecycle.associateWith("dalId", event.getId());
        commandGateway.send(CreateDalCurrencyCommand.builder()
                .id(event.getId())
                .payload(event.getPayload())
                .build());
    }

    @SagaEventHandler(associationProperty = "dalId")
    public void handle(DalCurrencyCreatedEvent event) {
        log.info("receiving createdEvent");
        SagaLifecycle.end();
    }


}

服务2

外部命令处理程序

@Slf4j
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Component
public class CurrencyCommandHandler {

    @Autowired
    private EventBus eventBus;

    @CommandHandler
    public void on(CreateDalCurrencyCommand command) {
        eventBus.publish(asEventMessage(CreateDalCurrencyEvent.builder()
                .id(command.getId())
                .payload(command.getPayload())
                .build()));
    }
}

事件处理程序

@Slf4j
@RequiredArgsConstructor
@Component
public class CurrencyEventHandlers {

    private final CurrencyCommandService commandService;

    private final CommandGateway commandGateway;

    @EventHandler
    public void handle(CreateDalCurrencyEvent event){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        commandService.createCurrency(event.getId(), event.getPayload());
        var result = commandGateway.send(DalCreatedCommand.builder()
            .id(event.getId())
            .build());
    }
}

最佳答案

我想我可以为您提供一些该领域的补充背景知识。

遗憾的是,用作发现服务的 Spring Cloud 的实现带来了天壤之别。 在内部,SpringCloudCommandRouter 使用ServiceInstance 的元数据来共享MessageRoutingInformation。连接到您的设置的每个应用程序都将由 ServiceInstance 表示,因此共享您作为服务可以通过此方法处理的消息(因此也是命令)将很简单。

但是,当构建 SpringCloudCommandRouter 时,这是通过利用 Eureka 作为 Spring Cloud 实现进行测试的。 Eureka 允许调整 ServiceInstance 的元数据,因此我可以相当有信心地说,如果您使用 Spring Cloud Eureka,我希望事情能像它那样工作。

但是,如果您使用 Consul,那就是另一回事了。 Spring Cloud Consul 不允许调整 ServiceInstance 的元数据。我创建了一个issue过去将 API 调整为实际上能够更新元数据。

无论如何,Axon Framework 已通过提供 SpringCloudHttpBackupCommandRouter 解决了为 Spring Cloud Consul 和其他不允许调整元数据的实现提供支持的问题。

因此,我建议将您的配置调整为使用 SpringCloudHttpBackupCommandRouter 而不是“SpringCloudCommandRouter”

关于java - 从外部服务发送命令时,聚合中的 axon 3.4 CommandHandler 不会被触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56165811/

相关文章:

java jdbc 类型 1 驱动程序 : thread safe or not?

java - mvn 发布 :prepare in spring boot 时无法将文件提交到 github

frameworks - Struts2配置和性能

java - Spring Boot 的 @RequestBody 中的 JSON 主体

java - Spring Boot 应用程序中日志记录概念的行数

java - 将所有实体保留在一个 EJB 模块中

java - DispatcherServlet 是否为每个用户创建新的 Controller 实例和方法?

java - Camunda 在启动流程时发送列表作为流程变量

Java BigDecimal : How to set scale only if it's greater than certain precision point?

java - Struts 2 中的重定向