java - Kafka 消费者手册提交偏移量

标签 java spring apache-kafka spring-integration spring-integration-dsl

我正在实现和 dsl spring 集成流程,它从 Kafka 获取消息

代码片段:

return IntegrationFlows.from(
                Kafka.messageDrivenChannelAdapter(new DefaultKafkaConsumerFactory(kafkaTelemetryDataConsumerConfiguration.getConsumerProperties()),
                        kafkaPropertiesConfiguration.getTelemetryDataTopic()))
                })
                .handle(bla.someImportantOperation())
                //TODO:do manual commit here
                //.handle(consumer.commitSync())

                .get();

我想知道如何手动提交同步,但只有在 .handle(bla.someImportantOperation()) 成功完成之后。

我不知道如何获取消费者引用,因为我使用的是 DefaultKafkaConsumerFactory,希望得到任何帮助。

这些是我用来创建消费者的 consumerProperties:

consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropertiesConfiguration.getBootstrapServers());
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

consumerProperties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, kafkaPropertiesConfiguration.getClientIdConfig());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaPropertiesConfiguration.getGroupIdConfig());

consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

最佳答案

Kafka.messageDrivenChannelAdapter() 为您提供了一个配置器 Hook :

.configureListenerContainer(c ->
                                c.ackMode(ContainerProperties.AckMode.MANUAL))

注意我提供的选项。

阅读它的 Javadoc,然后阅读 AcknowledgingMessageListener。 提到了 Acknowledgement。这个通过 KafkaHeaders.ACKNOWLEDGMENT 出现在消息头中。

所以,你在 //.handle(consumer.commitSync()) 中需要的就是这样的:

.handle(m -> headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge())

在 Spring 中查看 Apache Kafka 文档的更多信息:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#committing-offsets

关于java - Kafka 消费者手册提交偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53173913/

相关文章:

spring-boot - KafkaTemplate 线程安全吗

java - Kafka 流到主题

java - JUnit Ant 任务不会输出到屏幕

Spring 批处理与 quartz 工作?

apache-kafka - 如何在 Kafka 0.10 中找到主题分区的偏移范围?

java - spring:使用@Autowired和context:component-scan自动连接原型(prototype)bean时如何使用非默认构造函数?

spring - JUnit:如何按照 Spring 的意图访问 Spring 配置?

java - 如何等待 css 属性更改?

java - 在消息驱动 Bean (MDB) 中生成新的 Java 线程

java - 在 If 条件中匹配 Java 中的枚举字符串