我有一个问题,我必须在 Spring-Boot 中使用 Atomikos 和 Axon 框架(没有 Axon 服务器)。我正在使用 Oracle DB,并且使用多个线程 (10) 发送大量命令,在此之前我正在为自己配置 JtaTransactionManager,但在某些线程中我遇到这种异常:javax.transaction.xa。 XAException,引发 -6 或 -4 或 -3 或 ORA-02056: 2PC: k2lcom: 来自坐标的错误两相命令号 rdonly:。当我调试时,我发现 CommandGateWay 也在使用 JtaTransactionManager。这样对吗?什么时候开放交易?我的 JtaTransactionManager 和 Axon 是否有冲突? 有人遇到过这种异常(exception)吗?
示例代码:
@Service
public class CreateEntitiesServiceImpl extends FutureCompleter implements CreateEntitiesService {
private static Logger logger = LoggerHelper.getDeveloperLogger(CreateEntitiesServiceImpl.class);
private final CommandGateway commandGateway;
private final ExecutionUtil executionUtil;
private final MyEntityRepository myEntityRepository;
public CreateEntitiesServiceImpl(CommandGateway commandGateway, ExecutionUtil executionUtil, MyEntityRepository myEntityRepository) {
this.commandGateway = commandGateway;
this.executionUtil = executionUtil;
this.myEntityRepository = myEntityRepository;
}
@Override
public void process(Message message) {
logger.info("Entity addition started!");
generateEntities();
logger.info("Entity addition finished!");
}
private void generateEntities() {
ExecutorService executorService = executionUtil.createExecutor(10, "createEntities");
List<Integer> list = IntStream.rangeClosed(1, 1000).boxed().collect(Collectors.toList());
CreateEntitiesService proxy = applicationContext.getBean(CreateEntitiesServiceImpl.class);
List<CompletableFuture<Void>> processingFutures = list.stream().map(
e -> CompletableFuture.runAsync(proxy::createEntity, executorService).whenComplete((x, y) -> executorService.shutdown()))
.collect(Collectors.toList());
processingFutures.stream().map(this::getVoidFuture).collect(Collectors.toList());
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void createEntity() {
try {
MyEntity myEntity = new MyEntity();
myEntity.setEntityStringProperty("string");
myEntity.setEntityTimestampProperty(LocalDateTime.now());
MyEntity savedEntity = myEntityRepository.save(myEntity);
CreateAggregateCommand command = new CreateAggregateCommand(savedEntity.getEntityId(), savedEntity.getEntityStringProperty(),
savedEntity.getEntityTimestampProperty());
commandGateway.send(command);
} catch (Exception e) {
throw new CreateEntitiesException(e.getMessage(), e);
}
}
}
谢谢
最佳答案
老实说,我不完全理解你的要求。
标题指出 Axon 与 Atomikos,但我不认为这一点会在描述中出现。
您在问题描述中询问的是CommandGateway
使用事务管理器是否正确。
在这个问题上我可以明确地说:是的。 发送命令很可能会让您进入聚合实例。由于您希望保护此实例内的一致性边界并确保存储发布的事件,因此明智的做法是在处理命令后立即启动事务。
顺便说一句,小提示:CommandBus
使用了 TransactionManager
。从概念上讲,到目前为止,这并没有改变我的描述。
最后,我不完全确定这是否会对您有所帮助,因为我对您的问题并不完全清楚。我希望重写您的帖子能够更清楚地说明您真正想要的内容。
关于java - Axon 框架与 Atomikos?工作怎么样?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60390788/