elasticsearch - Logstash经过的过滤器

标签 elasticsearch logstash groc

我正在尝试在ELK堆栈中使用elapsed.rb过滤器,似乎无法弄清楚。我对grok不太熟悉,我相信这就是我的问题所在。有人可以帮忙吗?

示例日志文件:

{
    "application_name": "Application.exe",
    "machine_name": "Machine1",
    "user_name": "testuser",
    "entry_date": "2015-03-12T18:12:23.5187552Z",
    "chef_environment_name": "chefenvironment1",
    "chef_logging_cookbook_version": "0.1.9",
    "logging_level": "INFO",
    "performance": {
        "process_name": "account_search",
        "process_id": "Machine1|1|635617555435187552",
        "event_type": "enter"
    },
    "thread_name": "1",
    "logger_name": "TestLogger",
    "@version": "1",
    "@timestamp": "2015-03-12T18:18:48.918Z",
    "type": "rabbit",
    "log_from": "rabbit"
}

{
    "application_name": "Application.exe",
    "machine_name": "Machine1",
    "user_name": "testuser",
    "entry_date": "2015-03-12T18:12:23.7527462Z",
    "chef_environment_name": "chefenvironment1",
    "chef_logging_cookbook_version": "0.1.9",
    "logging_level": "INFO",
    "performance": {
        "process_name": "account_search",
        "process_id": "Machine1|1|635617555435187552",
        "event_type": "exit"
    },
    "thread_name": "1",
    "logger_name": "TestLogger",
    "@version": "1",
    "@timestamp": "2015-03-12T18:18:48.920Z",
    "type": "rabbit",
    "log_from": "rabbit"
}

示例.conf文件
input {
  rabbitmq {
    host => "SERVERNAME"
    add_field => ["log_from", "rabbit"]
    type => "rabbit"
    user => "testuser"
    password => "testuser"
    durable => "true"
    exchange => "Logging"
    queue => "testqueue"
    codec => "json"
    exclusive => "false"
    passive => "true"
  }
}


filter {

   grok {
     match => ["message", "%{TIMESTAMP_ISO8601} START id: (?<process_id>.*)"]
     add_tag => [ "taskStarted" ]
   }

   grok {
     match => ["message", "%{TIMESTAMP_ISO8601} END id: (?<process_id>.*)"]
     add_tag => [ "taskTerminated"]
   }

   elapsed {
    start_tag => "taskStarted"
    end_tag => "taskTerminated"
    unique_id_field => "process_id"
    timeout => 10000
    new_event_on_match => false
  }
}

output {
  file {
    codec => json { charset => "UTF-8" }
    path => "test.log"
  }
}

最佳答案

您不需要使用grok过滤器,因为您的输入已经是json格式。您需要执行以下操作:

if [performance][event_type] == "enter" {
  mutate { add_tag => ["taskStarted"] }
} else if [performance][event_type] == "exit" {
  mutate { add_tag => ["taskTerminated"] }
}
elapsed {
  start_tag => "taskStarted"
  end_tag => "taskTerminated"
  unique_id_field => "performance.process_id"
  timeout => 10000
  new_event_on_match => false
}

我对unique_id_field不太满意-我认为它应该可以工作,但是如果不行,您可以将其更改为仅process_idadd_field => { "process_id" => "%{[performance][process_id]}" }

关于elasticsearch - Logstash经过的过滤器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29020545/

相关文章:

email - logstash 电子邮件插件安装给出错误

elasticsearch 按文档 id 排序

elasticsearch - Elasticsearch通过使用关键字对数据进行排序

database - elasticsearch如何使用精确搜索并忽略关键字中的关键字特殊字符?

elasticsearch - 与Solr从头开始设计为分布式索引的 Elasticsearch 不同,ElasticSearch是什么意思?

python - 为什么我们在 Python 中使用 logstash_formatter 包?

mysql - Logstash - 从关系数据库组创建术语