我有以下带有 kafka 输入的 logstash 配置
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["mytopic"]
}
}
filter {
json {
source => "message"
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["localhost:9200"]
index => "my_index"
codec => "json"
document_id => "%{id}"
doc_as_upsert => true
action => "update"
}
}
我面临的问题是,当我运行 logstash 时,它不会获取关于该主题的旧消息。我的印象是,第一次运行 logstash 时,它会收集关于某个主题的所有未被消费的消息。我检查了这是一个新主题并且其中包含在 logstash 开始运行时未被拾取的消息。它确实会拾取运行时出现的有关该主题的消息,但不会拾取在其开始之前存在的消息。我是在配置中遗漏了什么,还是输入本身的怪癖。消息的保证对我的业务需求至关重要。
最佳答案
由于您没有为kafka指定组id,重要的注意事项如下:
- Kafka group.id(logstash kafka 配置中的 group_id)设置为 logstash 的默认值,即“logstash”
- logstash 中 enable.auto.commit (enable_auto_commit) 的默认 Kafka 值为“true”
- Kafka auto.offset.reset (auto_offset_reset) 在 logstash 中没有默认值,所以我假设使用了最新的 Kafka 默认值。
因此,当您在某个主题上运行消费者时,它无法提取主题中已有的消息,可能会发生以下两种情况之一:
- 不存在与消费者具有相同组 ID 的现有组,因此使用最新的 Kafka 默认 auto.offset.reset 值,消费者将忽略已经存在的消息。
- 存在一个具有相同组 ID(“logstash”)的现有组,并且具有该组 ID 的某些消费者已经使用了现有消息并提交了偏移量(此其他消费者可能是您之前运行的消费者或某些消费者具有相同组 ID 的其他消费者)。这意味着该组下的其他消费者不会重新使用这些消息,除非以某种方式明确告知这样做。
所以你可能想要做的是设置一些 Kafka 配置,对于 logstash 你应该能够设置
group_id => "some_random_group"
auto_offset_reset => "earliest"
如果您现在运行消费者,因为 some_random_group 没有现有的偏移量并且重置最早,消费者应该消费主题中的所有现有消息并提交偏移量。这意味着如果在消费完所有消息后再次运行消费者,它将不会消费现有消息。
关于elasticsearch - Logstash 5.1.1 kafka 输入不获取关于主题的现有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42033193/