Java Kafka 消费者组未能消费一些消息

标签 java apache-kafka

注意到一个问题,其中 Kafka 消费者组(用 java 实现)始终错过来自代理的一些消息。作为调试的第一线,通过 kafka 控制台消费者,我可以看到代理中可用的那些消息。

Kafka代理版本:0.10.1.0

Kafka客户端版本:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.9.0.1</version>
</dependency>

Kafka 消费者配置:

Properties props = new Properties();
props.put("bootstrap.servers","broker1,broker2,broker3");
props.put("group.id", "myGroupIdForDemo");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("heartbeat.interval.ms", "25000"); 
props.put("session.timeout.ms", "30000"); 
props.put("max.poll.interval.ms", "300000");
props.put("max.poll.records", "1");
props.put("zookeeper.session.timeout.ms", "120000");
props.put("zookeeper.sync.time.ms", "10000");
props.put("auto.commit.enable", "false");
props.put("auto.commit.interval.ms", "60000");
props.put("auto.offset.reset", "earliest");
props.put("consumer.timeout.ms", "-1");
props.put("rebalance.max.retries", "20");
props.put("rebalance.backoff.ms", "6000");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

编辑 - 添加更多信息

我想补充一些信息: 共有 6 个分区。但是,具有相同消费者组 ID 的主题的消费者总数为 40。我确实理解有 34 位消费者无所事事。

但是,我想了解的方面是,如果消费者未能发送心跳到broker认为已死并重新分配分区的程度,那么空闲的消费者是否有机会消费消息?这个消息未被消费的问题总是只在某些分区中被注意到。我的意思是无法从同一分区传递/使用消息。

感谢任何帮助。谢谢。

最佳答案

a) 即使在 Kafka 中也可能不存在消息 - 在这种情况下,请检查消息大小是否不超过 kafka 代理配置中允许的最大消息大小。

b) 如果您的消费者连接到 Kafka 实例 1 而 2-d 实例未连接,您可能会错过来自 2-d kafka 的消息:因此,在消费者连接字符串中指定所有代理。

3)如果kafka上存在消息并且你连接了,你可能无法反序列化消息,所以,尝试另一个反序列化器,可能不是字符串,而是字节数组,看看会发生什么,消息会被消费吗?如果是,则转换为字符串有问题。

4) 消息可能被另一个工作的消费者“窃取”,在同一个组 ID 下工作,选择唯一的组 ID。

5) 您使用什么记录器来查看已消耗的消息?您不怀疑这是记录器问题吗?

6) 您可能会在消费所有消息之前杀死/停止消费者?

7) 可能是您消费了,但由于消费者内存限制而失败了?我是增加-Xmx。 (堆大小)

关于Java Kafka 消费者组未能消费一些消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47939190/

相关文章:

apache-kafka - Kafka使用confluence命令启动失败

apache-kafka - 保留配置偏移量.retention.minutes和log.retention.minutes之间的差异

java - 基于阿拉伯语 desc 的 map 排序无法使用 java 正常工作

java - 如何将3个字母的小写字母向后移动?

java - 用Java对二维数组排序-跳过第一个索引

java - Kafka 消费者抛出 java.lang.OutOfMemoryError : Direct buffer memory

java - 在 Kafka Consumer 中反序列化 Avro 数据包时出现堆空间问题

java - 无法加载类路径上的 jar 文件

java - Jslider从JButton周期性改变值时无法勾选

java - 构建 Confluence IO 模式注册表源时出现问题