我的 Spring Boot 应用程序中有 3 个监听器。只有一名听众应该从头开始阅读主题。如果我添加到 yml 文件中: spring.kafka.consumer.auto-offset-reset: Early
那么它适用于所有监听器,但我只需要它。我添加了:
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_DOC;
......
@KafkaListener(groupId = "${random.uuid}",
properties = {AUTO_OFFSET_RESET_DOC + ":earliest"})
但它不起作用,设置没有被拾取,因为我在启动时看到打印的设置:
ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
有什么想法可以做到吗?
最佳答案
您提供的配置错误,应该是 AUTO_OFFSET_RESET_CONFIG
而不是 AUTO_OFFSET_RESET_DOC
@KafkaListener(groupId = "${random.uuid}",
properties = {AUTO_OFFSET_RESET_CONFIG + ":earliest"})
或者您可以直接指定属性
@KafkaListener(groupId = "${random.uuid}",
properties = {"auto.offset.reset = earliest"})
来自文档 @KafkaListener注释有一个名为 properties
的字段,它接受字符串数组
Kafka consumer properties; they will supersede any properties with the same name defined in the consumer factory (if the consumer factory supports property overrides).
支持的语法
The supported syntax for key-value pairs is the same as the syntax defined for entries in a Java properties file:
key=value
key:value
key value
group.id and client.id are ignored.
关于java - 多个监听器的 KafkaListener ConsumerConfig AUTO_OFFSET_RESET_DOC 最早,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57163953/