java - 如何消费多个主题的消息?

标签 java apache-kafka kafka-consumer-api

我正在尝试使用 assign() 方法使用来自多个主题的消息。通过我的实现,有时我能够使用来自所有主题的消息,有时我只能使用一个主题的消息。经过一番研究,我发现 Kafka 默认使用 Range 分配器。因此它不会总是分配所有分区。

对于我的用例,我应该能够从所有主题和分区进行消费。

我尝试过设置 RoundRobin 分配器。但这并没有帮助

List<TopicPartition> topicPartitions = new ArrayList<>();
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerConfig);
for (String topic : topics) {
   topicPartitions.add(new TopicPartition(topic, 0);
}
kafkaConsumer.assign(topicPartitions);
ConsumerRecords<String, String> records = kafkaConsumer.poll(600);`

最佳答案

KafkaConsumer.assign 通常用于复杂的用例,在这些用例中,您不仅想要控制主题,还想要控制您使用的分区。如果您只想从多个主题(及其所有分区)进行消费,您应该使用 KafkaConsumer.subscribe。

consumer.subscribe(Arrays.asList("topic1", "topic2"));

查看 javadoc javadoc其中还显示了代码示例。

编辑:如果您需要控制分区分配,那么您确实需要使用 allocate() 方法,但在您的(不完整)代码示例中,看起来您分配了每个主题的分区 0 ;因此您将只使用来自分区 0 的消息。

如果您需要手动控制偏移量,您仍然可以使用订阅,但您可以禁用自动提交并使用seek()和commitSync()或commitAsync()来控制偏移量。

关于java - 如何消费多个主题的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56634416/

相关文章:

apache-kafka - 每个分区都有多线程 Kafka 消费者,是否可能并推荐,如果有任何示例片段?

hadoop - 流式传输文件夹中的文件

java - C++保存和导入外部可执行结果而不写入磁盘

Java 并发 : Threads notifications

java - 通过覆盖 Arrays.sort() 对对象数组进行排序

apache-kafka - 卡夫卡错误 : Could not find or load main class org. apache.kafka.clients.tools.ProducerPerformance

apache-camel - 目的和区别 b/w Apache camel Kafka 消费者 URI 选项 consumerStreams 与 consumerCount

java - 如何获取 iText7 上表单字段的字体大小?

java - 如何解决 Kafka Broker 中的网络和内存问题?

docker - Kafka 消费者组偏移量下降到 -1