scala - Kafka - 为什么在将 AUTO_OFFSET_RESET_CONFIG 设置为 "latest"时,新的 groupId 不返回主题中的所有消息

标签 scala apache-kafka

我尝试在 scala 中实现一个非常简单的 Kafka (0.9.0.1) 消费者(代码如下)。

据我了解,Kafka(或者更确切地说是 Zookeeper)为每个 groupId 存储给定主题的最后一条消费消息的偏移量。所以给定以下场景:

  1. Consumer with groupId1 昨天消费了唯一的 5 主题中的消息。现在最后消费的消息有偏移量 4(考虑到 偏移量为 0) 的第一条消息
  2. 晚上有 2 条新消息到达该主题
  3. 今天重启消费者,使用相同的groupId1,会有 有两种选择:

选项 1:如果我将以下属性设置为 "latest",消费者将阅读夜间到达的最后 2 条新消息:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

选项 2:如果我将以下属性设置为 "earliest",消费者将阅读主题中的所有 7 条消息:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

问题:出于某种原因,如果我将消费者的 groupId 更改为 groupId2,这是给定主题的新 groupId,因此它从未消费过任何消息之前,它的最新偏移量应该是 0。我期待通过设置

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

消费者将在第一次执行期间读取主题中存储的所有消息(相当于最早)。然后对于接下来的执行,它将只消耗新的。然而,事实并非如此。

如果我设置一个新的 groupId 并将 AUTO_OFFSET_RESET_CONFIG 保持为 latest,消费者将无法阅读任何消息。然后我需要做的是在第一次运行时将 AUTO_OFFSET_RESET_CONFIG 设置为 earliest,一旦 groupID 已经有一个不同于 0 的偏移量,我就可以移动到 最新的

我的消费者应该是这样的吗?有没有比在我第一次运行消费者后切换 AUTO_OFFSET_RESET_CONFIG 更好的解决方案?

下面是我作为一个简单的消费者使用的代码:

class KafkaTestings {

  val brokers = "listOfBrokers"
  val groupId = "anyGroupId"
  val topic = "anyTopic"

  val props = createConsumerConfig(brokers, groupId)

  def createConsumerConfig(brokers: String, groupId: String): Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "12321")
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props
  }

  def run() = {
    consumer.subscribe(Collections.singletonList(this.topic))

    Executors.newSingleThreadExecutor.execute(    new Runnable {
      override def run(): Unit = {

        while (true) {
          val records = consumer.poll(1000)

          for (record <- records) {
            println("Record: "+record.value)
          }

        }

      }
    })
  }
}

object ScalaConsumer extends App {
  val testConsumer = new KafkaTestings()
  testConsumer.run()
} 

This被用作编写这个简单消费者的引用

最佳答案

这是按照记录工作的。

如果你开始一个新的消费者组(即 Kafka 中没有存储现有偏移量的消费者组),你必须选择消费者是否应该从最早的可能消息开始(主题中仍然可用的最旧消息)或来自最新的(仅从现在开始产生的消息)。

Is there a better solution than switching the AUTO_OFFSET_RESET_CONFIG after the first time I run the consumer?

您可以将它保持在 EARLIEST,因为当您第二次运行消费者时,它已经存储了偏移量,只需从那里获取。重置策略仅在创建新的消费者组时使用。

Today I restart the consumer, with the same groupId1, there will be two options:

不是真的。由于消费者组在前一天运行,它将找到其提交的偏移量并从中断的地方继续。因此,无论您将重置策略设置为什么,它都会收到这两条新消息。


虽然知道,Kafka 不会永远存储这些偏移量,但我相信默认值只是一周。因此,如果您关闭消费者的时间超过这个时间,偏移量可能会过时,并且您可能会意外重置为 EARLIEST(这对于大型主题来说可能代价高昂)。鉴于此,无论如何将其更改为 LATEST 可能是谨慎的做法。

关于scala - Kafka - 为什么在将 AUTO_OFFSET_RESET_CONFIG 设置为 "latest"时,新的 groupId 不返回主题中的所有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57195132/

相关文章:

spring-boot - 已请求默认 Binder ,但没有可用于 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' 的 Binder

apache-kafka - Zookeeper : java. io.IOException : No snapshot found, 但有日志条目。有东西坏了

docker - 如何配置 docker-compose 以便将磁盘挂载到文件夹,而不是本地磁盘

scala - 如何在 Play ScalaTest 上生成 HTML 报告

java - 使用 Spray 访问 GAE 数据存储

scala - 如果在Scala中找不到键,如何使用键访问 map 的值?

scala - Scala流的功能处理而不会出现OutOfMemory错误

javascript - Kafka 这个主题分区没有领导者,因为我们正在进行领导选举

Spring云流0​​x104567910

list - 查找列表 scala 中元素的索引