java - 无法使用ChainedKafkaTransaction同步Kafka和MQ事务

标签 java spring-boot apache-kafka transactions spring-kafka

我们有一个 Spring Boot 应用程序,它使用来自 IBM MQ 的消息进行一些转换并将结果发布到 Kafka 主题。我们使用https://spring.io/projects/spring-kafka为了这。我知道Kafka不支持XA;但是,在文档中,我发现了一些有关使用 ChainedKafkaTransactionManager 链接多个事务管理器并同步事务的输入。该文档还提供了一个示例,说明如何在从 Kafka 读取消息并将其存储到数据库时同步 Kafka 和数据库。

在我的案例中,我遵循相同的示例,并将 JmsTransactionManagerKafkaTransactionManager 链接在 ChainedKafkaTransactionManager 的保护伞下。 bean 定义如下:

@Bean({"mqListenerContainerFactory"})
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(this.connectionFactory());
    factory.setTransactionManager(this.jmsTransactionManager());
    return factory;
}

@Bean
public JmsTransactionManager jmsTransactionManager() {
    return new JmsTransactionManager(this.connectionFactory());
}

@Bean("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<?, ?> chainedKafkaTransactionManager(
        JmsTransactionManager jmsTransactionManager, KafkaTransactionManager kafkaTransactionManager) {

    return new ChainedKafkaTransactionManager<>(kafkaTransactionManager, jmsTransactionManager);
}

@Transactional(transactionManager = "chainedKafkaTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "${myApp.sourceQueue}", containerFactory = "mqListenerContainerFactory")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
    // Processing the message here then publishing it to Kafka using KafkaTemplate
    kafkaTemplate.send(sourceTopic,transformedMessage);

    // Then throw an exception just to test the transaction behaviour
    throw new RuntimeException("Not good Pal!");
}

运行应用程序时,发生的情况是消息不断回滚到 MQ 队列中,但 Kafka 主题中的消息不断增长,这对我来说意味着 kafkaTemplate 交互不会回滚。

如果我根据文档理解得很好,情况就不应该是这样。 “如果事务处于 Activity 状态,则在事务范围内执行的任何 KafkaTemplate 操作都会使用事务的生产者。”

在我们的 application.yaml 中,我们通过设置 spring.kafka. Producer.transaction-id-prefix 将 Kafka 生产者配置为使用事务

问题是我在这里缺少什么以及我应该如何解决它。 预先感谢您的投入。

最佳答案

消费者默认可以看到未提交的记录;将 isolation.level 使用者属性设置为 read_commissed 以避免从回滚事务接收记录。

关于java - 无法使用ChainedKafkaTransaction同步Kafka和MQ事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61926306/

相关文章:

java - Spring Security 记住我不工作

spring-boot - Spring boot 中配置服务器的多个搜索路径

apache-kafka - Kafka : Continuously getting FETCH_SESSION_ID_NOT_FOUND

apache-kafka - kafka按时间戳获取记录,消费者循环

java - Android:使用 FLAG ACTIVITY CLEAR TOP 时上一个 Activity 未完成

Java如何在通过Servlet类生成的jsp中刷新验证码

java - 使用 for 循环搜索值和

java - Spring AOP 和 Spring Boot - 创建代理但未执行建议

java - 你如何参数化 Spring Boot Gradle 插件?

java - 如何在java中传递bash脚本的参数?