java - 我可以根据给定的分区 ID 和偏移量列表使用 kafka 消息吗?

标签 java apache-kafka

假设我有一个有 4 个分区的 kafka 主题,每个分区都有一些已提交的消息。出于某种原因,我想根据给定的分区 id 和 offset# 重播一些已提交的消息,使用 java 客户端库执行此操作的最佳方法是什么?

例如我有

主题A:

分区1:...偏移-1,偏移-3,偏移-7...

分区2:...偏移2,偏移4,偏移8...

分区3:...偏移5,偏移6,偏移9...

分区4:...偏移-10,偏移-11,偏移-12...

我只想重玩

分区1:偏移量3

分区2:偏移8

分区3:偏移5

所以我有如下的伪代码

props.put("max.poll.records", "1"); // to make sure I only get exactly one desired message on that offset 

({(1,3),(2,8),(3,5)}).stream(part_offset-> {
  int i=1; // used as loop count down latch
while(i>=0){
 consumer.assign(get_partition(part_offset.part));
 consumer.seek(new TopicPartition("TopicA", part_offset.part), part_offset.offset);
 records=consumer.pool(Duration.ofSeconds(1)); // I read somewhere kafka is lazy , so should I poll before this ?
 for ( record : records) {
  //do something
  i--;
 }

}

})

但是上面的代码不起作用,它只是卡在那里什么也不做。 只是想知道使用给定分区 ID 和偏移信息重播某些消息的最佳方法是什么?或者,我的消费方式是否错误?请指教

非常感谢

最佳答案

对不起,是我的错。在轮询时,我没有对任何现有的偏移设置限制。因此,在我设置重试上限后,下面的代码工作正常,请随意发表评论。但只是想知道使用 int 来停止这样的重试是否是最佳实践?谢谢

        props.put("enable.auto.commit", "false");
        props.put("max.poll.records", "1");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "appname-message-patching");

        List<Pair<Integer,Integer>> partition_offset= Arrays.asList(Pair.of(0,1),Pair.of(0,21),
                Pair.of(1,31) );

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        partition_offset.forEach(po-> {
            TopicPartition tp=new TopicPartition(topicName, po.getKey());
            consumer.assign(Arrays.asList(tp));
            consumer.seek(tp,po.getValue());
            int counter=1;//only get 1 message , double guards for max.poll.records ?
            int retry=5;//retry 5 times then give up if cannot receive anything
            while (counter>0 ) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

                if(retry>0 || !records.isEmpty()) {
                    for (ConsumerRecord<String, String> record : records) {
                        counter--;
                        System.out.printf("==================thread name = %s , partation = %d , offset = %d , key = %s , value = %s\n", Thread.currentThread().getName(), record.partition(), record.offset(), record.key(), record.value());
                    }
                    retry--;
                }
                if (retry==0||counter==0){
                    counter=0;break;
                }
            }
        });
        consumer.close();

关于java - 我可以根据给定的分区 ID 和偏移量列表使用 kafka 消息吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59367622/

相关文章:

具有批处理功能的 Java 8 Stream

c# - 测试 spy 实现之间的差异

java - 如何修改doc文档的元数据

java - 如何检查 Kafka Server 是否正在运行?

hadoop - Hive 为 HDFS 中的每个插入创建多个小文件

apache-kafka - Confluence Cloud 上的 Apache Kafka - 分区主题和消费者滞后中的不连贯偏移

java - 如何配置 MapStruct 以在无法映射枚举值时抛出异常

java - Hibernate JCache 5.4.3.Final 不适用于 JCache 5.4.2.Final 配置

apache-kafka - Kafka Connect 配置和偏移主题是否可以共享

java - Kafka 从相同的偏移量重启