spring-boot - Spring kafka,ConsumerSeekAware接口(interface),再平衡时是否调用onPartitionsAssigned方法

标签 spring-boot spring-kafka

重新平衡时是否调用 ConsumerSeekAware 接口(interface)、onPartitionsAssigned 方法。因为我想在初始化和重新平衡时寻找偏移量中的特定偏移量。我可以将 consumerSeekAware 用于这两个目的,还是应该将 ConsumerRebalanceListener 用于重新平衡目的。请给出简单的答案,因为我对 spring kafka 还没有深入的了解。如果可以,请提供示例代码。谢谢

最佳答案

ConsumerSeekAware有这个方法:

/**
 * When using group management, called when partition assignments change.
 * @param assignments the new assignments and their current offsets.
 * @param callback the callback to perform an initial seek after assignment.
 */
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

它是从 KafkaMessageListenerContainer.seekPartitions(Collection<TopicPartition> partitions, boolean idle) 调用的。 ,而这又来自 ConsumerRebalanceListener.onPartitionsAssigned()内部实现。最后一个有这个JavaDocs:
 * A callback method the user can implement to provide handling of customized offsets on completion of a successful
 * partition re-assignment. This method will be called after an offset re-assignment completes and before the
 * consumer starts fetching data.

所以,是的,ConsumerSeekAware.onPartitionsAssigned()在重新平衡期间总是调用。顺便说一句,Apache Kafka 没有像 initializing 这样的状态。 .总是 rebalancing - 代理处于等待状态,并在有新消费者加入时开始重新平衡。

关于spring-boot - Spring kafka,ConsumerSeekAware接口(interface),再平衡时是否调用onPartitionsAssigned方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48981743/

相关文章:

java - Apache Kafka : 3 partitions, 消费者组中有3个消费者,每个消费者应该是多线程的

java - Spring Kafka - 在运行时订阅新主题

spring-boot - 带有Spring Boot和Kafka的Docker内部的Truststore的问题路径

java - 如何查询具有未定义深度的分层类别树实体

java - Spring无法在实现Quartz Job的类中注入(inject)Bean

java - spring-kafka 中未应用最小获取字节数属性

java - Kafka AdminClientConfig 忽略提供的配置

spring-boot - Spring-找到多个Spring Data模块,进入严格的存储库配置模式

Java Springboot Hibernate 设置 Oracle 数据库 session 参数

java - 防止 Swagger 自动添加某些模型