java - Kafka Consumer WakeupException 处理 Java

标签 java apache-kafka kafka-consumer-api

如果我在线程内运行kafka消费者而不从外部操作它,比如让消费者进入休眠状态或唤醒他,则有必要处理WakeupException适本地?处理它的好方法是什么?

消费者在网络服务上运行,不断地从队列中提取数据,并且永远不应该停止这样做。此外,该服务没有空闲或挂起状态。在Kafka的文档中指出,只有当kafka消费者被另一个线程阻塞时才会抛出异常,但这种情况永远不会发生。 https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/errors/WakeupException.html

卡夫卡版本0.10.0.0

catch (WakeupException e) {
    LOG.info("Kafka Consulmer wakeup exception");
    // Ignore exception if closing
    if (!closed.get()) {
        throw e;
    }
} finally {
    consumer.close();
}

问候, 拉克什

最佳答案

您可以在 Confluence 文档中找到一些示例,这些示例展示了如何正确处理 WakeupException [Confluent Consumer Doc]

基本上,如果您使用 consumer.poll(Integer.MAX_VALUE)消费者将阻塞,直到获取消息。在这种情况下,如果您想停止消费,可以调用 consumer.wakeup() (来自另一个线程)并捕获异常以正确关闭。

此外,在同步提交偏移量时,调用 consumer.wakeup()会抛出 WakeupException .

关于java - Kafka Consumer WakeupException 处理 Java,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37728078/

相关文章:

java - 使用 EclipseLink 获取数据库备份/转储

java - 将 Spring 3.1 与 RequestMappingHandlerAdapter 一起使用时出现默认构造函数错误

apache-kafka - kafka-consumer-group描述消费者组中的结果不存在

apache-kafka - 无法访问kafka.serializer.StringDecoder

java - 动态消费来自kafka主题的消息

java - Kafka 获取偏移数据时出错。原因: 1

apache-kafka - 理解kafka、消费者群体和主题

java - 我可以在不与 @Service 进行数据交互的情况下注释类吗?

java - 服务器到 Firebase HTTP POST 结果为响应消息 200

java - kafka 8 和内存 - 内存不足,Java 运行时环境无法继续