java - Kafka Broker 偏移/日志保留和消费者偏移在最早模式下重置

标签 java spring-boot apache-kafka spring-cloud-stream

问题描述:

我们的 Kafka 消费者(在 Spring Boot 2.x 中开发)正在执行几天。 当我们重新启动这些消费者时,该主题的所有消息都会再次被消费,但仅限于特定条件下。

条件:

我们假设代理/主题配置(log.retention.*offsets.retention.*)和消费者配置(auto.offset.*)的组合。重置 = 最早)导致此行为。
显然我们不能将consumer设置为“latest”,因为如果consumer停止并且有新消息到达,当consumer再次启动时,这些消息将不会被消费。

问题:

避免这种情况的正确设置是什么?
在上一个 Kafka Broker 版本 (2.x) 中,log.retention.* 和 offsets.retention.* 的默认值是相同的 ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days )

这个新的配置设置可以解决问题吗?

消费者配置(auto.commit在 Spring Cloud Stream 框架上委托(delegate)):

           auto.commit.interval.ms = 100
           auto.offset.reset = earliest
           bootstrap.servers = [server1:9092]
           check.crcs = true
           client.id = 
           connections.max.idle.ms = 540000
           enable.auto.commit = false
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = consumer_group1
           heartbeat.interval.ms = 3000
           interceptor.classes = null
           internal.leave.group.on.close = true
           isolation.level = read_uncommitted
           key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 305000
           retry.backoff.ms = 100
           value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

经纪人配置:

           log.retention.ms = 86400000
           log.retention.minutes = 10080
           log.retention.hours = 168
           log.retention.bytes = -1

           offsets.retention.ms = 864000000
           offsets.retention.minutes = 14400
           offsets.retention.hours = 240 

           unclean.leader.election.enable = false
           log.cleaner.enable = true
           auto.leader.rebalance.enable = true
           leader.imbalance.check.interval.seconds = 300
           log.retention.check.interval.ms = 300000
           log.cleaner.delete.retention.ms = 604800000

感谢和问候

最佳答案

您是对的,您遇到此问题的原因是 log.retention.*offsets.retention.* 的值不同(分别为 7 天和 1 天) Kafka 2.0之前版本请查看description here 。 这是由于很少有消息进入您的主题,并且偏移数据已经过期。

您的短语并不完全正确显然我们无法将消费者设置为“最新”。 如果您在不到 1 天前(例如几小时前)收到最后一条消息,您可以安全地将 auto.offset.reset 值更新为 latest,并使用相同的群组 ID (或application.id)。在这种情况下,您不会丢失消息。

作为另一个选项,您可以将特定主题的日志保留值更改为 1 天。 您也可以更新值 offsets.retention.*,但您需要从性能角度对其进行测试,它可能会降低。

关于java - Kafka Broker 偏移/日志保留和消费者偏移在最早模式下重置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53851114/

相关文章:

java - 为什么 Sonar 似乎忽略了@SuppressWarnings

java - 使用 docker 在 EC2 中部署时 Spring Boot 应用程序不可用 - 错误 404

apache-kafka - 启用 auto.create.topics 时 Kafka 删除主题

java - Kafka Connect API 和 Avro 对象(SourceRecord 与 org.apache.avro.Schema)

java - 与静态 block 同步

java - 如何加载和解析 SVG 文档

java - 将 List<Mono<String> 转换为 Flux<String>

java - 延迟 Kafka Streams 消费

java - GWT:如何使用 AsyncDataModel 填充 CellTree 的初始列表?

java - 在我们开始我们的移动+服务器项目之前,您能发表评论吗?