我正在尝试使用高级消费者批量读取 Kafka 主题中的消息。 在这批读取期间,我的线程必须在某个时刻停止。
Either, once all the messages in the topic are exhausted. or Get the max offset at the point when the messages are about to be read and stop till that max offset is reached.
我尝试使用high-level-consumer处的代码但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息进入。
所以 3 个问题,
我如何知道该主题不再有消息可供读取?
如果我对上述问题有了答案,我该如何阻止它再收听该主题?
有没有办法在批量读取开始时找到最大偏移量(我认为简单的消费者可以做到这一点)并让高级消费者在该点停止?
最佳答案
您可以选择在指定时间内没有新消息到达时,您认为所有消息均已阅读。这可以使用消费者属性consumer.timeout.ms进行设置。当这个指定的值过去而没有任何新消息到达时,ConsumerIterator 将抛出一个超时异常,您可以在消费者中处理该异常并退出。
关于java - 卡夫卡高级消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29110909/