我目前正在使用Spring Integration Kafka做实时统计。但是,组名使 Kafka 搜索监听器未读取的所有先前值。
@Value("${kafka.consumer.group.id}")
private String consumerGroupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}
public Map<String, Object> getDefaultProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return properties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaMessageListener listener() {
return new KafkaMessageListener();
}
我想从最新的偏移量开始,而不是被旧值所困扰。是否有可能重置组的偏移量?
最佳答案
因为我没有看到这方面的任何例子,所以我将在这里解释我是如何做到的。
您的@KafkaListener
类必须实现一个ConsumerSeekAware
类,这将允许监听器在分配分区时控制偏移查找。 (来源:https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek)
public class KafkaMessageListener implements ConsumerSeekAware {
@KafkaListener(topics = "your.topic")
public void listen(byte[] payload) {
// ...
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
}
在这里,在重新平衡时,我们使用给定的回调来寻找所有给定主题的最后一个偏移量。感谢 Artem Bilan (https://stackoverflow.com/users/2756547/artem-bilan) 指导我找到答案。
关于java - Spring Kafka - 如何使用组 ID 将偏移量重置为最新?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47777395/