java - Kafka 弹性 - 组协调器

标签 java spring apache-kafka

据我了解,其中一位经纪人被选为负责消费者再平衡的组协调员。

Discovered coordinator host:9092 (id: 2147483646 rack: null) for group good_group

我有 3 个节点,复制因子为 3 和 3 个分区。 一切都很好,当我在非协调器节点上杀死 kafka 时,消费者仍在接收消息。

但是当我用协调器杀死那个特定节点时,重新平衡没有发生,我的 java 消费者应用程序没有收到任何消息。

2018-05-29 16:34:22.668 INFO  AbstractCoordinator:555 - Discovered coordinator host:9092 (id: 2147483646 rack: null) for group good_group.
2018-05-29 16:34:22.689 INFO  AbstractCoordinator:600 - Marking the coordinator host:9092 (id: 2147483646 rack: null) dead for group good_group
2018-05-29 16:34:22.801 INFO  AbstractCoordinator:555 - Discovered coordinator host:9092 (id: 2147483646 rack: null) for group good_group.
2018-05-29 16:34:22.832 INFO  AbstractCoordinator:600 - Marking the coordinator host:9092 (id: 2147483646 rack: null) dead for group good_group
2018-05-29 16:34:22.933 INFO  AbstractCoordinator:555 - Discovered coordinator host:9092 (id: 2147483646 rack: null) for group good_group.
2018-05-29 16:34:23.044 WARN  ConsumerCoordinator:535 - Auto offset commit failed for group good_group: Offset commit failed with a retriable exception. You should retry committing offsets. 

我做错了什么吗?有什么办法可以解决这个问题吗?

最佳答案

But when I kill that specific node with coordinator, rebalancing is not happening and my java consumer app does not receive any messages.

组协调器接收来自消费者组中所有消费者的心跳。它维护一个活跃消费者列表,并根据此列表的更改启动重新平衡。然后组长执行再平衡 Activity 。

这就是为什么如果你杀死组协调器,重新平衡将停止的原因。

更新

在组协调器代理关闭的情况下,Zookeeper 将收到通知并开始选举以自动从 Activity 代理中提升新的组协调器。所以与小组协调员无关。让我们看看日志:

2018-05-29 16:34:23.044 WARN  ConsumerCoordinator:535 - Auto offset commit failed for group good_group: Offset commit failed with a retriable exception. You should retry committing offsets.

内部topic__consumer_offset的replication factor可能有默认值1。你能看看default.replication.factoroffsets.topic是什么值吗? replication.factor 在 server.properties 文件中。如果默认值为 1,则应将其更改为更大的值。如果不这样做,组协调器将关闭,导致偏移量管理器在没有备份的情况下停止。所以commit offset这个activity是做不出来的。

关于java - Kafka 弹性 - 组协调器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50593008/

相关文章:

java - Java 中的基础数学

扩展参数类/接口(interface)的java基类

java - 无法实现继承的抽象方法

java - 二叉树的递归代码流程

java - 如何将 url 参数绑定(bind)到 jax-rs 中的 Controller 参数对象

apache-kafka - 如何将 kafkacat 有效负载打印输出转换为二进制

java - 直接读取 index.html 而不在 URL 中暴露其路径

java - 卡在 ControllerLinkBuilder 上

scala - Spark 2.2.0独立模式写入Dataframe到本地单节点Kafka时出错

maven - 从maven部署到nexus时指定版本