如果我在线程内运行kafka
消费者而不从外部操作它,比如让消费者进入休眠状态或唤醒他,则有必要处理WakeupException
适本地?处理它的好方法是什么?
消费者在网络服务上运行,不断地从队列中提取数据,并且永远不应该停止这样做。此外,该服务没有空闲或挂起状态。在Kafka的文档中指出,只有当kafka消费者被另一个线程阻塞时才会抛出异常,但这种情况永远不会发生。 https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/errors/WakeupException.html
卡夫卡版本0.10.0.0
catch (WakeupException e) {
LOG.info("Kafka Consulmer wakeup exception");
// Ignore exception if closing
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
问候, 拉克什
最佳答案
您可以在 Confluence 文档中找到一些示例,这些示例展示了如何正确处理 WakeupException [Confluent Consumer Doc]
基本上,如果您使用 consumer.poll(Integer.MAX_VALUE)
消费者将阻塞,直到获取消息。在这种情况下,如果您想停止消费,可以调用 consumer.wakeup()
(来自另一个线程)并捕获异常以正确关闭。
此外,在同步提交偏移量时,调用 consumer.wakeup()
会抛出 WakeupException
.
关于java - Kafka Consumer WakeupException 处理 Java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37728078/