elasticsearch - Logstash 5.1.1 kafka 输入不获取关于主题的现有消息

标签 elasticsearch apache-kafka logstash apache-zookeeper

我有以下带有 kafka 输入的 logstash 配置

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["mytopic"]
  }
}
filter {
  json {
    source => "message"
  }
}
output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "my_index"
    codec => "json"
    document_id => "%{id}"
    doc_as_upsert => true
    action => "update"
  }
}

我面临的问题是,当我运行 logstash 时,它不会获取关于该主题的旧消息。我的印象是,第一次运行 logstash 时,它会收集关于某个主题的所有未被消费的消息。我检查了这是一个新主题并且其中包含在 logstash 开始运行时未被拾取的消息。它确实会拾取运行时出现的有关该主题的消息,但不会拾取在其开始之前存在的消息。我是在配置中遗漏了什么,还是输入本身的怪癖。消息的保证对我的业务需求至关重要。

最佳答案

由于您没有为kafka指定组id,重要的注意事项如下:

  • Kafka group.id(logstash kafka 配置中的 group_id)设置为 logstash 的默认值,即“logstash”
  • logstash 中 enable.auto.commit (enable_auto_commit) 的默认 Kafka 值为“true”
  • Kafka auto.offset.reset (auto_offset_reset) 在 logstash 中没有默认值,所以我假设使用了最新的 Kafka 默认值。

因此,当您在某个主题上运行消费者时,它无法提取主题中已有的消息,可能会发生以下两种情况之一:

  1. 不存在与消费者具有相同组 ID 的现有组,因此使用最新的 Kafka 默认 auto.offset.reset 值,消费者将忽略已经存在的消息。
  2. 存在一个具有相同组 ID(“logstash”)的现有组,并且具有该组 ID 的某些消费者已经使用了现有消息并提交了偏移量(此其他消费者可能是您之前运行的消费者或某些消费者具有相同组 ID 的其他消费者)。这意味着该组下的其他消费者不会重新使用这些消息,除非以某种方式明确告知这样做。

所以你可能想要做的是设置一些 Kafka 配置,对于 logstash 你应该能够设置

group_id => "some_random_group"

auto_offset_reset => "earliest"

如果您现在运行消费者,因为 some_random_group 没有现有的偏移量并且重置最早,消费者应该消费主题中的所有现有消息并提交偏移量。这意味着如果在消费完所有消息后再次运行消费者,它将不会消费现有消息。

关于elasticsearch - Logstash 5.1.1 kafka 输入不获取关于主题的现有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42033193/

相关文章:

java - Kafka Consumer架构设计 : java plugin or external client

apache-kafka - 无法在 DC/OS 中启动 Confluent 2.0 (apache-kafka) Schema-Registry

elasticsearch - 在下一次索引轮换时更改 elasticsearch 索引的分片计数

logstash - 从logstash中的路径中提取字段

postgresql - 无法将logstash配置为postgres

elasticsearch - ElasticSearch快照-AlreadyClosedException已关闭

elasticsearch - Docker中的Elasticsearch 2.0.0集群Zen Zen发现

apache-kafka - 使用 Kafka 进行数据集成与更新和删除

elasticsearch - 如何在Elasticsearch中搜索偏重音关键字?

ruby-on-rails - ElasticSearch/Tire:如何正确设置部分单词搜索