apache-kafka - 我可以在不检索所有消息的情况下检索 Kafka 分区的最新可用偏移量吗?

标签 apache-kafka kafka-consumer-api

查看最新的 (v0.10) Kafka Consumer documentation :

“消费者的 位置 给出了下一条将发出的记录的偏移量。它将比消费者在该分区中看到的最高偏移量大一个。每次消费者收到时它都会自动前进数据调用 poll(long) 并接收消息。”

有没有办法查询服务器端分区可用的最大偏移量,没有 检索所有消息?

我试图实现的逻辑如下:

  • 每秒查询主题中待处理消息的数量 (A)
  • 如果 A > 阈值,则唤醒将继续检索所有消息并处理它们的处理器
  • 否则什么都不做( sleep 1)

  • 动机是我需要做一些批处理,但我希望处理器仅在有足够数据时唤醒(并且我不想两次检索所有数据)。

    最佳答案

    您可以使用 Consumer.seekToEnd()方法,运行 Consumer.poll(0)使其生效但立即返回,然后 Consumer.position()查找所有订阅(或分配)主题分区的位置。这些将是所有分区的当前最终偏移量。这也将开始从代理获取这些偏移量的一些数据,但如果您随后返回到不同的位置,任何返回的数据都将被忽略。

    目前,正如 serejja 所提到的,替代方案是使用旧的简单消费者,尽管该过程相当复杂,因为您需要手动查找每个分区的领导者。

    关于apache-kafka - 我可以在不检索所有消息的情况下检索 Kafka 分区的最新可用偏移量吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38684813/

    相关文章:

    java - 无法从与 StateStore 不同的应用程序访问 KTable

    spring-boot - 在 Kafka 监听器线程中生成记录时获取 ProducerFencedException

    apache-kafka - Kafka 异步提交偏移复制

    elasticsearch - 如何使用logstash将Kafka主题键值索引为字段?

    java - Kafka 0.10 Java 客户端 TimeoutException : Batch containing 1 record(s) expired

    java - Kafka - 关闭(kafka.server.KafkaServer),启动 Kafka-Server-Start 时出现问题

    c# - Kafka Confluent 库中 poll 和 consume 的区别

    java - 获取分配给 Kafka 分区的消费者或客户端 ID

    apache-kafka - kafka streams groupBy 聚合产生意外值

    java - flink - 集群不使用集群