java - Kafka 模式订阅。新主题未触发再平衡

标签 java apache-kafka kafka-consumer-api

根据 kafka javadocs 上的文档如果我:

  • 订阅模式
  • 创建一个符合模式的主题

应该会发生重新平衡,从而使消费者从该新主题中读取内容。但这并没有发生。

如果我停止并启动消费者,它确实会选择新主题。所以我知道新主题与模式匹配。 https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics 中可能有此问题的重复项但这个问题毫无意义。

我看到了 kafka 日志并且没有错误,它只是没有触发重新平衡。当消费者加入或死亡时会触发重新平衡,但不会在创建新主题时触发(即使将分区添加到现有主题时也不会,但那是另一个主题)。

我正在使用 kafka 0.10.0.0 和“新消费者 API”的官方 Java 客户端,这意味着代理 GroupCoordinator 而不是胖客户端 + zookeeper。

这是示例消费者的代码:

public class SampleConsumer {
public static void main(String[] args) throws IOException {
    KafkaConsumer<String, String> consumer;
    try (InputStream props = Resources.getResource("consumer.props").openStream()) {
        Properties properties = new Properties();
        properties.load(props);
        properties.setProperty("group.id", "my-group");

        System.out.println(properties.get("group.id"));
        consumer = new KafkaConsumer<>(properties);
    }
    Pattern pattern = Pattern.compile("mytopic.+");
    consumer.subscribe(pattern, new SampleRebalanceListener());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("%s %s\n", record.topic(), record.value());
        }
    }
}

在生产者中,我将消息发送到名为 mytopic1、mytopic2 等的主题。

如果没有触发再平衡,模式几乎没有用。

您知道为什么再平衡没有发生吗?

最佳答案

文档提到“模式匹配将针对检查时存在的主题定期进行。”。事实证明,“定期”对应于 metadata.max.age.ms 属性。通过将该属性(在我的代码示例中的“consumer.props”内)设置为 5000,我可以看到它每 5 秒检测一次新主题和分区。

这是设计的,根据这个 jira ticket https://issues.apache.org/jira/browse/KAFKA-3854 :

The final note on the JIRA stating that a later created topic that matches a consumer's subscription pattern would not be assigned to the consumer upon creation seems to be as designed. A repeat subscribe() to the same pattern would be needed to handle that case.

刷新元数据轮询执行票证中提到的“重复订阅()”。

这是来自 Kafka 0.8 的令人困惑的地方,那里有基于 zookeper watches 的真正触发,而不是轮询。 IMO 0.9 更像是针对这种情况的降级,而不是“及时”重新平衡,这变成了带有开销的高频轮询,或者在对新主题/分区使用react之前需要很长时间的低频轮询。

关于java - Kafka 模式订阅。新主题未触发再平衡,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38754865/

相关文章:

Java:按名称实例化具体实现

apache-kafka - 如何将 kafkacat 有效负载打印输出转换为二进制

apache-kafka - Kafka 最优保留和删除策略

java - 什么数据格式被认为在 Kafka 上写入速度最快?

Java X Y 坐标

java - Apache Solr 中是否有 MySQL 的存储过程替代方案?

java - 如何解决org.hibernate.NonUniqueObjectException : a different object with the same identifier value was already associated with the session:

java - 如何实现 FlinkKafkaPartitioner?

json - 我是否需要自定义序列化程序来在 kafka 中生成 JSON 消息?

apache-kafka - 最近的 Kafka 版本中的消费者组列表存储在哪里?