java - 所有分区的 seekToEnd 并在 Kafka 消费者的自动重新平衡中幸存下来

标签 java apache-kafka kotlin

当消费者组 A 的 Kafka 消费者连接到 Kafka 代理时,我想查找所有分区的末尾,即使在代理端存储了偏移量。如果更多的消费者正在为同一个消费者组连接,他们应该获取最新存储的偏移量。 我正在做以下事情:

consumer.poll(timeout) 
consumer.seekToEnd(emptyList())

while(true) {
  val records = consumer.poll(timeout)
  if(records.isNotEmpty()) {
    //print records
    consumer.commitSync()
  }
}

问题是当我连接消费者组 A 的第一个消费者 c1 时,一切都按预期工作,如果我连接消费者组 A 的另一个消费者 c2,该组正在重新平衡,c1 将消耗跳过的偏移量。

有什么想法吗?

最佳答案

您可以创建一个实现ConsumerRebalanceListener 的类,如下所示:

public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {

    private Consumer<K, V> consumer;

    public AlwaysSeekToEndListener(Consumer consumer) {
        this.consumer = consumer;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        consumer.seekToEnd(partitions);
    }
}

然后在订阅主题时使用这个监听器:

consumer.subscribe(Collections.singletonList("test"), new AlwaysSeekToEndListener<String, String>(consumer));

关于java - 所有分区的 seekToEnd 并在 Kafka 消费者的自动重新平衡中幸存下来,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45075147/

相关文章:

Java 开发环境 : Eclipse and NetBeans annoyances : any possible fixes or alternatives?

java - 如何通过 Selenium Java 初始化 PhantomJS 浏览器

apache-kafka - 消费者未收到消息,kafka 控制台,新的消费者 api,Kafka 0.9

java - 如何在安卓应用中设置最大音量?

java - 在 Java Android 应用程序中使用 Kotlin 代码

java - Guava 前提条件检查,空列表

java - 如何将/ibm_security_logout 的外部 JSP/HTML 片段嵌入到 JSF 中?

apache-spark - 如何将字节从 Kafka 转换为原始对象?

java - spring Kafka 负载类型的模糊方法

android - session cookie 不会与 JSoup 保持一致