java - 将确认传递给 spring KafkaListener 消费者方法

标签 java spring-kafka

我正在尝试关闭 kafka 中的自动提交,而是手动执行。为此,在我的 application.properties 中,我设置了 spring.kafka.properties.enable.auto.commit=false

我目前还有一个具有以下 header 的方法:

@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.PPA_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
                    @Headers MessageHeaders headers)

我的理解是,为了手动提交,我需要访问 Acknowledgement 对象,该对象将作为参数传递给我的 receive() 方法。我的问题:如果我将标题更改为

@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.APP_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
                    @Headers MessageHeaders headers,
                    Acknowledgment acknowledgment)

确认会自动传入,还是我需要进行其他更改?

最佳答案

是的,这样一个 Acknowledgment 实例将被传递到您的监听器方法中。成功处理收到的消息后,您应该调用 acknowledgement.acknowledge(); (仅当您想手动确认时才需要)

我还会切换到MANUAL确认模式并关闭自动提交(您已经做了),例如通过提供自定义 Spring Boot 配置类 - 也可以通过 application.properties 进行配置:

@Configuration
class KafkaConfiguration {

        @Bean
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

            final Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
            consumerProperties.put(ENABLE_AUTO_COMMIT_CONFIG, false);

            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.getContainerProperties().setAckMode(MANUAL);

            return factory;
        }
    }
}

如果您不想手动确认,那么不同的确认模式可能更方便且更适合:

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.AckMode.html

AckMode.RECORD 非常舒服,因为如果监听器的方法实现成功完成(不抛出异常),则传递到监听器方法的 Kafka 记录将自动被确认。

关于java - 将确认传递给 spring KafkaListener 消费者方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59740441/

相关文章:

Java,将不同对象的列表保存到SD卡

java - 无法检测测试类的默认资源位置

java - 在 try block 中返回一个字符串

java - 如何针对故障点确保生产者和消费者应用程序之间的kafka事务同步?

docker - 如何更改 Spring Cloud Stream Kafka binder 的目标?

java - Java中的 boolean 值与 boolean 值

java - 正则表达式删除两个字符串之间的子字符串

java - 当引导服务器关闭时,具有 transactionIdPrefix 的 DefaultKafkaProducerFactory 会无限等待

java - Spring Cloud Stream 多主题事务管理

java - 如何将 kafka 消息反序列化到 POJO?