我正在尝试使用 assign()
方法使用来自多个主题的消息。通过我的实现,有时我能够使用来自所有主题的消息,有时我只能使用一个主题的消息。经过一番研究,我发现 Kafka 默认使用 Range 分配器。因此它不会总是分配所有分区。
对于我的用例,我应该能够从所有主题和分区进行消费。
我尝试过设置 RoundRobin 分配器。但这并没有帮助
List<TopicPartition> topicPartitions = new ArrayList<>();
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerConfig);
for (String topic : topics) {
topicPartitions.add(new TopicPartition(topic, 0);
}
kafkaConsumer.assign(topicPartitions);
ConsumerRecords<String, String> records = kafkaConsumer.poll(600);`
最佳答案
KafkaConsumer.assign 通常用于复杂的用例,在这些用例中,您不仅想要控制主题,还想要控制您使用的分区。如果您只想从多个主题(及其所有分区)进行消费,您应该使用 KafkaConsumer.subscribe。
consumer.subscribe(Arrays.asList("topic1", "topic2"));
查看 javadoc javadoc其中还显示了代码示例。
编辑:如果您需要控制分区分配,那么您确实需要使用 allocate() 方法,但在您的(不完整)代码示例中,看起来您分配了每个主题的分区 0 ;因此您将只使用来自分区 0 的消息。
如果您需要手动控制偏移量,您仍然可以使用订阅,但您可以禁用自动提交并使用seek()和commitSync()或commitAsync()来控制偏移量。
关于java - 如何消费多个主题的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56634416/