java - Spring Kafka - 如何使用组 ID 将偏移量重置为最新?

标签 java spring apache-kafka spring-integration spring-kafka

我目前正在使用Spring Integration Kafka做实时统计。但是,组名使 Kafka 搜索监听器未读取的所有先前值。

@Value("${kafka.consumer.group.id}")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}

public Map<String, Object> getDefaultProperties() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaMessageListener listener() {
    return new KafkaMessageListener();
}

我想从最新的偏移量开始,而不是被旧值所困扰。是否有可能重置组的偏移量?

最佳答案

因为我没有看到这方面的任何例子,所以我将在这里解释我是如何做到的。

您的@KafkaListener 类必须实现一个ConsumerSeekAware 类,这将允许监听器在分配分区时控制偏移查找。 (来源:https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek)

public class KafkaMessageListener implements ConsumerSeekAware {
    @KafkaListener(topics = "your.topic")
    public void listen(byte[] payload) {
        // ...
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {


    }
}

在这里,在重新平衡时,我们使用给定的回调来寻找所有给定主题的最后一个偏移量。感谢 Artem Bilan (https://stackoverflow.com/users/2756547/artem-bilan) 指导我找到答案。

关于java - Spring Kafka - 如何使用组 ID 将偏移量重置为最新?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47777395/

相关文章:

java - 将spring持久化存储库注入(inject)到服务层

java - 从 Controller 中的结果集中排除列 | Spring 数据jpa

java - 带有 Spark 的 Kafka 抛出无法初始化类 kafka.utils.Log4jController 错误

kubernetes - 检查我的 Kafka 和 Zookeeper 功能和连接

java - 骡子 ESB : Expression Filter with regex() function

java - java中如何将字符串视为变量?

java - 如何使用java代码重启eclipse的tomcat服务器

java - 在 Controller 中抛出异常时,Spring Websecurity 在 'ignored' 资源上抛出 401

java - 除了白色标签错误页面之外什么也看不到 - Spring boot + hibernate

scala - 为什么使用 Kafka 的 Spark Streaming 应用程序失败并显示 "ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition"?