java - 多次从 Kafka 读取同一条消息

标签 java spring apache-kafka kafka-consumer-api spring-kafka

我使用 Spring Kafka API使用手动偏移量管理实现 Kafka 消费者:

@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
    if (someCondition) {
        acknowledgment.acknowledge();
    }
}

在这里,我希望消费者仅在 someCondition 成立时提交偏移量。否则,消费者应该睡一会儿,然后再次阅读相同的消息

卡夫卡配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
    factory.getContainerProperties().setAckMode(MANUAL);
    return factory;
}

private Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    ...
    return props;
}

使用当前配置,如果 someCondition == false,消费者不会提交偏移量,但仍会读取下一条消息。如果 Kafka acknowledgement 没有执行,有没有办法让消费者重新阅读消息?

最佳答案

正如@Gary 已经指出的那样,您的方向是正确的,seek() 是做到这一点的方法。今天遇到这个问题时,我找不到它的代码示例。这是任何想要解决问题的人的代码。

public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {

    private ConsumerSeekCallback consumerSeekCallback;


    @Override
    public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {

        if (/*some condition*/) {
            //process
            acknowledgment.acknowledge(); //send ack
        } else {

            consumerSeekCallback.seek("your.topic", record.partition(), record.offset());

        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
        this.consumerSeekCallback = consumerSeekCallback;
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

}

关于java - 多次从 Kafka 读取同一条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39536012/

相关文章:

java - 如何使用 UriInfo 测试 JERSEY Controller 方法

java - Spring Boot 将事务(和数据库连接)传播到 @Async 方法

java - 状态 404 - MockMvc 授权 - Spring boot REST 文档

apache-kafka - Kafka 异步提交偏移复制

mongodb - Kafka Mongodb 接收器连接器 - 更新文档

java - 如何从jar文件动态加载类

java - 是否有适用于 Netbeans 7.3 的 google app engine 插件

Java 的 instanceof 奇怪行为

java - 泽西警告 - 无法解析为具体类型

apache-kafka - Kafka 在某些节点上分区不同步