java - 将kafka-consumer设置为consumer-group中的容灾消费者?

标签 java apache-kafka kafka-consumer-api

我有一个 Kafka 基础设施,其主题只有一个分区,2 个消费者将订阅该分区。第一个消费者配置在应用程序的主实例中,第二个消费者是应用程序灾难恢复实例的一部分。
现在这里的问题是,我希望第二个消费者充当故障转移消费者,即如果应用程序消费者的主实例出现故障,故障转移应用程序的消费者应该开始使用消息。
我可以做的一件事是,在第二个消费者方面,我可以通过调用 Thread#Sleep 来引入延迟。 , 这样分区就会分配给主实例

 // called at the consumer bean initialization
 void subscribeConsume() {
    // leaving exception handling for brevity
    if(application.type.equals("BackupApp"))  {
       TimeUnit.MINUTE.sleep(5);
    }
     ...
    // subscription logic
 }
但这似乎是一种解决方法/hack,而不是一个强大的解决方案。消费者库中是否有任何可以在这里提供帮助的 Kafka 属性。

最佳答案

如果您有一台 kafka 服务器具有一个具有单个分区的主题和两个消费者使用同一主题,默认情况下,一次只有一个消费者能够接收消息。因为它只有一个分区。当该消费者关闭时, session 超时(默认值为 5 分钟)后,消费者重新平衡将被启动,您的第二个消费者将能够接收相同主题的消息。现在你已经提到你有 kafka 基础设施,如果你提到基础设施是否有 kafka 集群处于故障转移配置中,这将是有意义的。

关于java - 将kafka-consumer设置为consumer-group中的容灾消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61711816/

相关文章:

java - 确保并发对象调用而不加锁的最佳方法是什么

javascript - 在 webview 中显示网页的一部分

java - mysql数据库搜索文本真的很慢

java - 是否可以在 java 中获取默认网关 IP 和 MAC 地址?

java - 使用融合模式注册表注册 AVRO 模式

windows - 尝试在 Windows 中启动 Zookeeper 时出现 "log4j.properties was unexpected at this time"

amazon-web-services - 使用自托管 Kafka 作为 AWS Lambda 的事件源

Python Kafka消费者读取已读消息

apache-kafka - Kafka Consumer Group Id 和消费者再平衡问题

java - 延迟 Kafka 主题中的一些记录