java - 卡夫卡高级消费者

标签 java apache-kafka kafka-consumer-api

我正在尝试使用高级消费者批量读取 Kafka 主题中的消息。 在这批读取期间,我的线程必须在某个时刻停止。

Either, once all the messages in the topic are exhausted. or Get the max offset at the point when the messages are about to be read and stop till that max offset is reached.

我尝试使用high-level-consumer处的代码但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息进入。

所以 3 个问题,

  1. 我如何知道该主题不再有消息可供读取?

  2. 如果我对上述问题有了答案,我该如何阻止它再收听该主题?

  3. 有没有办法在批量读取开始时找到最大偏移量(我认为简单的消费者可以做到这一点)并让高级消费者在该点停止?

最佳答案

您可以选择在指定时间内没有新消息到达时,您认为所有消息均已阅读。这可以使用消费者属性consumer.timeout.ms进行设置。当这个指定的值过去而没有任何新消息到达时,ConsumerIterator 将抛出一个超时异常,您可以在消费者中处理该异常并退出。

关于java - 卡夫卡高级消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29110909/

相关文章:

docker - Gradle 中的 Kafka 集成测试运行到 GitHub Actions

python - kafka-python KafkaConsumer 多分区提交偏移量

apache-kafka - 为多个分区使用kafka批处理

apache-kafka - Kafka : Error from SyncGroup, 请求超时

java - Swing:灰色多行JCheckBox?

java - Visual Studio Code 是否提供了一个过程/片段来创建一个类似于 Eclipse 的新 Java 类文件?

java - 通过 XPath [Selenium] 从 Span 元素中提取文本

java - 如何从锁屏以编程方式更改亮度

java - kafka 消费者 API consumer.poll() 不能正常工作,没有异常,只是阻塞

apache-kafka - Kafka Consumer:找不到连接条目