java - 如何在读取 kafka 主题时验证无效的分区名称

标签 java apache-kafka kafka-consumer-api

我正在编写一些代码来根据分区号和偏移量从Kafka Topic读取特定消息,代码如下

TopicPartition topicPartition = new TopicPartition(this.topicName, partition);
this.consumer.assign(Collections.singletonList(topicPartition));
this.consumer.seek(topicPartition, offset);
ConsumerRecords<Object, Object> records = this.consumer.poll(50000L);

现在,如果在 partition 变量中传递了错误的值,我希望代码抛出错误,但现在它可以工作,只是不获取任何记录。所以我无法区分什么时候没有给定输入的记录,或者给定的输入是错误的。

最佳答案

您可以使用检索集群元数据的方法之一来查找现有分区。

例如,您可以调用partitionsFor() ,或listTopics()获取主题的现有分区列表。然后您可以检测提供的分区是否存在。

关于java - 如何在读取 kafka 主题时验证无效的分区名称,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68903024/

相关文章:

apache-spark - 是否可以在 Kafka+Spark Streaming 中获取特定的消息偏移量?

java - 消费者在重新启动之前无法从新创建的分区进行轮询

hadoop - 通过 nifi 将 Hive 表中的数据放入 kafka 主题

ssl - 如何在关闭 TLS 对等验证的情况下使用 Kafka

java - 在java代码中检查Kafka消费者组的活跃度

java - 将 Jpanel 放置在 Jframe 或另一个 Jpanel 的中间

java - 流式作业与循环批处理作业使用 Kafka 队列中的数据

java - 为什么对 treeSet 使用自定义 Comparator 会破坏 String 对象的相等性?

java - 选择所有YouTube视频

java - JPA分页: setFirstResult() and setMaxResults() : real pagination?