我正在尝试关闭 kafka 中的自动提交,而是手动执行。为此,在我的 application.properties
中,我设置了 spring.kafka.properties.enable.auto.commit=false
我目前还有一个具有以下 header 的方法:
@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.PPA_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
@Headers MessageHeaders headers)
我的理解是,为了手动提交,我需要访问 Acknowledgement
对象,该对象将作为参数传递给我的 receive()
方法。我的问题:如果我将标题更改为
@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.APP_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
@Headers MessageHeaders headers,
Acknowledgment acknowledgment)
确认
会自动传入,还是我需要进行其他更改?
最佳答案
是的,这样一个 Acknowledgment
实例将被传递到您的监听器方法中。成功处理收到的消息后,您应该调用 acknowledgement.acknowledge();
(仅当您想手动确认时才需要)
我还会切换到MANUAL
确认模式并关闭自动提交(您已经做了),例如通过提供自定义 Spring Boot 配置类 - 也可以通过 application.properties
进行配置:
@Configuration
class KafkaConfiguration {
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
final Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
consumerProperties.put(ENABLE_AUTO_COMMIT_CONFIG, false);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
}
}
如果您不想手动确认,那么不同的确认模式可能更方便且更适合:
AckMode.RECORD
非常舒服,因为如果监听器的方法实现成功完成(不抛出异常),则传递到监听器方法的 Kafka 记录将自动被确认。
关于java - 将确认传递给 spring KafkaListener 消费者方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59740441/