java - 多个监听器的 KafkaListener ConsumerConfig AUTO_OFFSET_RESET_DOC 最早

标签 java spring apache-kafka listener consumer

我的 Spring Boot 应用程序中有 3 个监听器。只有一名听众应该从头开始阅读主题。如果我添加到 yml 文件中: spring.kafka.consumer.auto-offset-reset: Early 那么它适用于所有监听器,但我只需要它。我添加了:

import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_DOC;
......
@KafkaListener(groupId = "${random.uuid}",
            properties = {AUTO_OFFSET_RESET_DOC + ":earliest"})

但它不起作用,设置没有被拾取,因为我在启动时看到打印的设置:

ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest

有什么想法可以做到吗?

最佳答案

您提供的配置错误,应该是 AUTO_OFFSET_RESET_CONFIG 而不是 AUTO_OFFSET_RESET_DOC

@KafkaListener(groupId = "${random.uuid}",
        properties = {AUTO_OFFSET_RESET_CONFIG + ":earliest"})

或者您可以直接指定属性

@KafkaListener(groupId = "${random.uuid}",
        properties = {"auto.offset.reset = earliest"})

来自文档 @KafkaListener注释有一个名为 properties 的字段,它接受字符串数组

Kafka consumer properties; they will supersede any properties with the same name defined in the consumer factory (if the consumer factory supports property overrides).

支持的语法

The supported syntax for key-value pairs is the same as the syntax defined for entries in a Java properties file:

key=value
key:value
key value

group.id and client.id are ignored.

关于java - 多个监听器的 KafkaListener ConsumerConfig AUTO_OFFSET_RESET_DOC 最早,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57163953/

相关文章:

java - appcompat_v7 和 android-support-v7-appcompat 之间的区别?

java - 无法从实体类 Hibernate 4.2.7 Websphere 8.5.5 oracle 11g 创建表

java - 使用 Spring MVC、JPA 和 Hibernate 添加 ManyToMany

java - Spring ServletContext 返回 null

java - 使用不同的字符和模式分割字符串

java - 无法从数据源获取数据库连接

sql - 设置过高的批量大小有什么缺点?

ruby-on-rails - 如何将Kafka客户端添加到Rails Docker部署中?

apache-kafka - 如何使用 kafka 0.10.x 获取所有组列表

apache-kafka - 以编程方式查找kafka主题大小