假设我有一个有 4 个分区的 kafka 主题,每个分区都有一些已提交的消息。出于某种原因,我想根据给定的分区 id 和 offset# 重播一些已提交的消息,使用 java 客户端库执行此操作的最佳方法是什么?
例如我有
主题A:
分区1:...偏移-1,偏移-3,偏移-7...
分区2:...偏移2,偏移4,偏移8...
分区3:...偏移5,偏移6,偏移9...
分区4:...偏移-10,偏移-11,偏移-12...
我只想重玩
分区1:偏移量3
分区2:偏移8
分区3:偏移5
所以我有如下的伪代码
props.put("max.poll.records", "1"); // to make sure I only get exactly one desired message on that offset
({(1,3),(2,8),(3,5)}).stream(part_offset-> {
int i=1; // used as loop count down latch
while(i>=0){
consumer.assign(get_partition(part_offset.part));
consumer.seek(new TopicPartition("TopicA", part_offset.part), part_offset.offset);
records=consumer.pool(Duration.ofSeconds(1)); // I read somewhere kafka is lazy , so should I poll before this ?
for ( record : records) {
//do something
i--;
}
}
})
但是上面的代码不起作用,它只是卡在那里什么也不做。 只是想知道使用给定分区 ID 和偏移信息重播某些消息的最佳方法是什么?或者,我的消费方式是否错误?请指教
非常感谢
最佳答案
对不起,是我的错。在轮询时,我没有对任何现有的偏移设置限制。因此,在我设置重试上限后,下面的代码工作正常,请随意发表评论。但只是想知道使用 int 来停止这样的重试是否是最佳实践?谢谢
props.put("enable.auto.commit", "false");
props.put("max.poll.records", "1");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "appname-message-patching");
List<Pair<Integer,Integer>> partition_offset= Arrays.asList(Pair.of(0,1),Pair.of(0,21),
Pair.of(1,31) );
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
partition_offset.forEach(po-> {
TopicPartition tp=new TopicPartition(topicName, po.getKey());
consumer.assign(Arrays.asList(tp));
consumer.seek(tp,po.getValue());
int counter=1;//only get 1 message , double guards for max.poll.records ?
int retry=5;//retry 5 times then give up if cannot receive anything
while (counter>0 ) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
if(retry>0 || !records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
counter--;
System.out.printf("==================thread name = %s , partation = %d , offset = %d , key = %s , value = %s\n", Thread.currentThread().getName(), record.partition(), record.offset(), record.key(), record.value());
}
retry--;
}
if (retry==0||counter==0){
counter=0;break;
}
}
});
consumer.close();
关于java - 我可以根据给定的分区 ID 和偏移量列表使用 kafka 消息吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59367622/