spring-boot - 如何在kafka监听器方法中寻找特定的偏移量?

标签 spring-boot apache-kafka kafka-consumer-api offset spring-kafka

我正在尝试在我的 kafka 监听器方法中寻找 SQL 数据库的偏移量。 我在代码中使用了 registerSeekCallback 方法,但是当我们运行使用者(或启动容器)时会调用此方法。假设我的消费者正在运行,并且 MySql 数据库中最后提交的偏移量为 20。我手动将 Mysql 数据库中最后提交的偏移量更改为 11,但我的消费者将继续从 21 开始读取,除非我重新启动我的消费者(容器已重新启动)。如果我可以在监听器方法本身中覆盖或寻求偏移量,我正在寻找任何选项。任何帮助将不胜感激。

public class Listen implements ConsumerSeekAware 
{
 @Override
    public void registerSeekCallback(ConsumerSeekCallback callback)

    {
//      fetching offset from a database 
        Integer offset = offsetService.getOffset();
        callback.seek("topic-name",0,offset);

    }
 @KafkaListener(topics = "topic-name", groupId = "group")
  public void listen(ConsumerRecord record Acknowledgment acknowledgment) throws Exception 
  {
//    processing the record 

      acknowledgment.acknowledge();    //manually commiting the record
//    committing the offset to MySQL database
  }
}

使用新的监听器方法进行编辑:-

@KafkaListener(topics = "topic-name", groupId = "group")
  public void listen(ConsumerRecord record Acknowledgment acknowledgment, 
  @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer)) throws Exception {
       // seeking old offset stored in database (which is 11 )
        consumer.seek(partition,offsetService.getOffset());
        log.info("record offset is {} and value is {}" , record.offset(),record.value() );
        acknowledgment.acknowledge();
}

在数据库中,我最后提交的偏移量是11,kafka端最后提交的偏移量是21。当我在kafka主题中写入新记录(即偏移量22)时,我的消费者首先触发并处理22偏移量,然后返回到寻找偏移量 11 并从那里开始处理。 尽管我正在寻找偏移量 11,但为什么它首先消耗偏移量 22?

使用上面的代码,每次我向我的 kafka top 写入一条新消息时,它都会首先处理该记录,然后查找数据库中存在的偏移量。有什么办法可以避免这种情况吗?

最佳答案

this answer中有多种技巧.

请记住,对使用者执行查找要到下一次轮询才会生效(上次轮询中获取的任何记录将首先发送给使用者)。

编辑

这是一个例子:

@SpringBootApplication
public class So63429201Application {

    public static void main(String[] args) {
        SpringApplication.run(So63429201Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, Listener listener) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so63429201", i % 3,  null, "foo" + i));
            Thread.sleep(8000);
            listener.seekToTime(System.currentTimeMillis() - 11000);
            Thread.sleep(8000);
            listener.seekToOffset(new TopicPartition("so63429201", 0), 11);
            Thread.sleep(8000);
        };
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63429201").partitions(3).replicas(1).build();
    }

}

@Component
class Listener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "so63429201", topics = "so63429201", concurrency = "2")
    public void listen(String in) {
        System.out.println(in);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        System.out.println(assignments);
        super.onPartitionsAssigned(assignments, callback);
        callback.seekToBeginning(assignments.keySet());
    }

    public void seekToTime(long time) {
        getSeekCallbacks().forEach((tp, callback) -> callback.seekToTimestamp(tp.topic(), tp.partition(), time));
    }

    public void seekToOffset(TopicPartition tp, long offset) {
        getSeekCallbackFor(tp).seek(tp.topic(), tp.partition(), offset);
    }

}

关于spring-boot - 如何在kafka监听器方法中寻找特定的偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63429201/

相关文章:

java - 非唯一列上的 HIbernate onetoMany 映射

java - 如何在 spring boot 中启用/禁用特定的休息端点?

apache-kafka - Spring Kafka 消费者/听众组

apache-spark - 是否可以在 Kafka+Spark Streaming 中获取特定的消息偏移量?

java - Kafka 消费者在可完成的 future 内抵消提交

logging - 如何让 Spring Boot 使用 log4j.xml 配置文件?

jsf - NoClassDefFoundError 在 Spring-boot 中运行 JSF

python - 使用 pem key 和客户端证书的 KAFKA SSL 连接

java - 如何从 PCollection<String> 创建 PCollection<Row> 以执行波束 SQL 转换

java - 对标 Kafka - 性能一般