我使用 Spring Kafka API使用手动偏移量管理实现 Kafka 消费者:
@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
if (someCondition) {
acknowledgment.acknowledge();
}
}
在这里,我希望消费者仅在 someCondition
成立时提交偏移量。否则,消费者应该睡一会儿,然后再次阅读相同的消息。
卡夫卡配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
...
return props;
}
使用当前配置,如果 someCondition == false
,消费者不会提交偏移量,但仍会读取下一条消息。如果 Kafka acknowledgement
没有执行,有没有办法让消费者重新阅读消息?
最佳答案
正如@Gary 已经指出的那样,您的方向是正确的,seek()
是做到这一点的方法。今天遇到这个问题时,我找不到它的代码示例。这是任何想要解决问题的人的代码。
public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {
private ConsumerSeekCallback consumerSeekCallback;
@Override
public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {
if (/*some condition*/) {
//process
acknowledgment.acknowledge(); //send ack
} else {
consumerSeekCallback.seek("your.topic", record.partition(), record.offset());
}
}
@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
this.consumerSeekCallback = consumerSeekCallback;
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
// nothing is needed here for this program
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
// nothing is needed here for this program
}
}
关于java - 多次从 Kafka 读取同一条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39536012/