spring-boot - Spring Kafka 即使我暂停消费者,也总是在 5 分钟后重新平衡

标签 spring-boot apache-kafka spring-kafka

有一个耗时的操作(大约10分钟),但是kafka在5分钟后重新平衡,即使我暂停了消费者。消费者方法:

    @KafkaListener(topics = {TopicAppoint.EXECUTE_SCHOOL_DATA_STATICS_TASK})
public void receiveMessage(@Payload String payload, Consumer<String, String> consumer) {

    Set<TopicPartition> assignment = consumer.assignment();
    consumer.pause(assignment);

    if (StringUtils.isNotEmpty(payload)) {
        SchoolStatisticsTaskDTO staticsTaskDTO = JSONObject.parseObject(payload, SchoolStatisticsTaskDTO.class);
        Optional<SchoolStatisticsTaskDO> taskOptional = schoolStatisticsTaskRepository.findById(staticsTaskDTO.getTrackId());
        taskOptional.ifPresent(schoolStaticsTaskDO -> {
            // handler
        });
    }

    consumer.resume(assignment);
}

这是我的配置:

  kafka:
bootstrap-servers: 192.168.0.230:9092
producer:
  key-serializer: org.apache.kafka.common.serialization.StringSerializer
  value-serializer: org.apache.kafka.common.serialization.StringSerializer
  retries: 3
  properties:
    max.request.size: 12582912
consumer:
  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  group-id: dc-fitness-data-consumer-group
  properties:
    max.partition.fetch.bytes: 12582912
  #enable-auto-commit: false
listener:
  ack-mode: record
  concurrency: 6

日志

  13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
   13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.records.incremental.update.task-4, ft.es.records.incremental.update.task-5, ft.es.records.incremental.update.task-2, ft.es.records.incremental.update.task-3, ft.es.records.incremental.update.task-0, ft.es.records.incremental.update.task-1]
   13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.records.incremental.update.task-4, ft.es.records.incremental.update.task-5, ft.es.records.incremental.update.task-2, ft.es.records.incremental.update.task-3, ft.es.records.incremental.update.task-0, ft.es.records.incremental.update.task-1]
  13:09:20.219 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-25, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.220 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.student.batch.upload.task-17, ft.student.batch.upload.task-14, ft.student.batch.upload.task-13, ft.student.batch.upload.task-16, ft.student.batch.upload.task-15, ft.student.batch.upload.task-12]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.student.batch.upload.task-17, ft.student.batch.upload.task-14, ft.student.batch.upload.task-13, ft.student.batch.upload.task-16, ft.student.batch.upload.task-15, ft.student.batch.upload.task-12]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#9-2-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-21, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.record.batch.upload.task-32, ft.record.batch.upload.task-31, ft.record.batch.upload.task-34, ft.record.batch.upload.task-33, ft.record.batch.upload.task-30, ft.record.batch.upload.task-35]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.record.batch.upload.task-32, ft.record.batch.upload.task-31, ft.record.batch.upload.task-34, ft.record.batch.upload.task-33, ft.record.batch.upload.task-30, ft.record.batch.upload.task-35]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#8-5-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-18, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.class.batch.upload.task-18, ft.class.batch.upload.task-19, ft.class.batch.upload.task-20, ft.class.batch.upload.task-21, ft.class.batch.upload.task-22, ft.class.batch.upload.task-23]
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.221 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.class.batch.upload.task-26, ft.class.batch.upload.task-27, ft.class.batch.upload.task-28, ft.class.batch.upload.task-29, ft.class.batch.upload.task-24, ft.class.batch.upload.task-25]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.class.batch.upload.task-18, ft.class.batch.upload.task-19, ft.class.batch.upload.task-20, ft.class.batch.upload.task-21, ft.class.batch.upload.task-22, ft.class.batch.upload.task-23]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.class.incremental.update.task-26, ft.es.class.incremental.update.task-27, ft.es.class.incremental.update.task-28, ft.es.class.incremental.update.task-29, ft.es.class.incremental.update.task-24, ft.es.class.incremental.update.task-25]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.class.batch.upload.task-26, ft.class.batch.upload.task-27, ft.class.batch.upload.task-28, ft.class.batch.upload.task-29, ft.class.batch.upload.task-24, ft.class.batch.upload.task-25]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.class.incremental.update.task-26, ft.es.class.incremental.update.task-27, ft.es.class.incremental.update.task-28, ft.es.class.incremental.update.task-29, ft.es.class.incremental.update.task-24, ft.es.class.incremental.update.task-25]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#6-3-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-53, groupId=dc-fitness-data-consumer-group] (Re-)joining group
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] Revoking previously assigned partitions [ft.es.class.incremental.update.task-22, ft.es.class.incremental.update.task-23, ft.es.class.incremental.update.task-18, ft.es.class.incremental.update.task-19, ft.es.class.incremental.update.task-20, ft.es.class.incremental.update.task-21]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [ft.es.class.incremental.update.task-22, ft.es.class.incremental.update.task-23, ft.es.class.incremental.update.task-18, ft.es.class.incremental.update.task-19, ft.es.class.incremental.update.task-20, ft.es.class.incremental.update.task-21]
  13:09:20.223 [org.springframework.kafka.KafkaListenerEndpointContainer#5-4-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-47, groupId=dc-fitness-data-consumer-group] Attempt to heartbeat failed since group is rebalancing
  13:09:20.225 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-52, groupId=dc-fitness-data-consumer-group] (Re-)joining group

最佳答案

让我们看一下文档,从 pause() 方法开始

公共(public)无效暂停(集合分区) here

Suspend fetching from the requested partitions. Future calls to poll(long) will not return any records from these partitions until they have been resumed using resume(Collection). Note that this method does not affect partition subscription. In particular, it does not cause a group rebalance when automatic assignment is used.

所以从上面的描述来看,pause()方法会暂停分区获取消息,但不会暂停消费者线程,这意味着消费者线程将执行后续的 poll() 请求暂停的分区,但不会获取任何记录

在您的应用程序中:分区已暂停,但使用者线程正忙于执行耗时操作超过 10 分钟,而没有执行任何轮询请求。

检测消费者故障:因此当轮询停止时,心跳不会发送到集群

After subscribing to a set of topics, the consumer will automatically join the group when poll(long) is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the poll API sends periodic heartbeats to the server; when you stop calling poll (perhaps because an exception was thrown), then no heartbeats will be sent. If a period of the configured session timeout elapses before the server has received a heartbeat, then the consumer will be kicked out of the group and its partitions will be reassigned.

解决方案:增加以下属性的超时,因为默认时间为 5 分钟,您会看到每 5 分钟进行一次重新平衡(个人不支持增加超时)here

The new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than max.poll.interval.ms because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value of session.timeout.ms has been adjusted down to 10 seconds, and the default value of max.poll.records has been changed to 500.

解决方案2:如果您需要用更少的时间处理大量数据,请增加更多分区并用每个线程消耗每个分区(这意味着更多并发性),5 天内可以处理的数据更少分钟

关于spring-boot - Spring Kafka 即使我暂停消费者,也总是在 5 分钟后重新平衡,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53292655/

相关文章:

java - 授权 header 未显示在 CURL springdoc-openapi-ui 中

spring-boot - @AllArgsConstructor 和构造函数注入(inject) Spring : is private final needed?

java - 将 Collection 与 ReplyingKafkaTemplate 结合使用时,相关 ID 会丢失

java - spring-kafka 如何在后台运行消费者

hadoop - 我可以将API与STORM或KAFKA连接

java - Spring-Boot 和 Kafka : How to handle broker not available?

java - 使用 Spring Data R2DBC 进行批量插入时检索生成的 ID

spring - 列出所有部署的休息端点(spring-boot,tomcat)

amazon-web-services - 是否可以从 AWS 外部访问 Amazon Kinesis

java - spring-kafka 具有不同 Json 对象的多个 @KafkaListener 不起作用