elasticsearch - 过滤数据时出现logstash grok问题

标签 elasticsearch logstash logstash-grok

我有一个基本上用于通过rm命令删除数据的数据,如下所示。

ttmv516,19/05/21,03:59,00-mins,dvcm,dvcm 166820 4.1 0.0 4212 736 ? DN 03:59 0:01 rm -rf /dv/project/agile/mce_dev_folic/test/install.asan/install,/dv/svgwwt/commander/workspace4/dvfcronrun_IL-SFV-RHEL6.5-K4_kinite_agile_invoke_dvfcronrun_at_given_site_50322

我正在下面使用logstash grok,它工作正常,但直到最近我看到两个奇怪的问题1)_grokparsefailure另外2)Hostname Field没有正确显示,即它的初始字符不像ttmv516会像mv516一样出现。
%{HOSTNAME:Hostname},%{DATE:Date},%{HOUR:dt_h}:%{MINUTE:dt_m},%{NUMBER:duration}-%{WORD:hm},%{USER:User},%{USER:User_1} %{NUMBER:Pid} %{NUMBER:float} %{NUMBER:float} %{NUMBER:Num_1} %{NUMBER:Num_2} %{DATA} (?:%{HOUR:dt_h1}:|)(?:%{MINUTE:dt_m1}|) (?:%{HOUR:dt_h2}:|)(?:%{MINUTE:dt_m2}|)%{GREEDYDATA:CMD},%{GREEDYDATA:PWD_PATH}

但是,使用kokana数据中的grok Debugger进行的测试正确显示。

enter image description here

我的logstash文件如下。
cat /etc/logstash/conf.d/rmlog.conf
input {
  file {
    path => [ "/data/rm_logs/*.txt" ]
    start_position => beginning
    sincedb_path => "/data/registry-1"
    max_open_files => 64000
    type => "rmlog"
  }
}

filter {
  if [type] == "rmlog" {
    grok {
     match => { "message" => "%{HOSTNAME:Hostname},%{DATE:Date},%{HOUR:dt_h}:%{MINUTE:dt_m},%{NUMBER:duration}-%{WORD:hm},%{USER:User},%{USER:User_1} %{NUMBER:Pid} %{NUMBER:float} %{NUMBER:float} %{NUMBER:Num_1} %{NUMBER:Num_2} %{DATA} (?:%{HOUR:dt_h1}:|)(?:%{MINUTE:dt_m1}|) (?:%{HOUR:dt_h2}:|)(?:%{MINUTE:dt_m2}|)%{GREEDYDATA:CMD},%{GREEDYDATA:PWD_PATH}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "@version", "host", "message", "_type", "_index", "_score" ]
   }
  }
 }
output {
        if [type] == "rmlog" {
        elasticsearch {
                hosts => ["myhost.xyz.com:9200"]
                manage_template => false
                index => "pt-rmlog-%{+YYYY.MM.dd}"
  }
 }
}

任何帮助建议将不胜感激。

编辑:

根据我的观察其失败的消息..
ttmv540,19/05/21,03:59,00-hrs,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv540.373
ttmv541,19/05/21,03:43,-mins,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv541.373

但是,我试图用下面的条件来编辑grok,但是它仍然删除了一些字段。
input {
  file {
    path => [ "/data/rm_logs/*.txt" ]
    start_position => beginning
    max_open_files => 64000
    sincedb_path => "/data/registry-1"
    type => "rmlog"
  }
}
filter {
  if [type] == "rmlog" {
    grok {
     match => { "message" => "%{HOSTNAME:hostname},%{DATE:date},%{HOUR:time_h}:%{MINUTE:time_m},%{NUMBER:duration}-%{WORD:hm},%{USER:user},%{USER:group} %{NUMBER:pid} %{NUMBER:float} %{NUMBER:float} %{NUMBER:num_1} %{NUMBER:num_2} %{DATA} (?:%{HOUR:time_h1}:|)(?:%{MINUTE:time_m1}|) (?:%{HOUR:time_h2}:|)(?:%{MINUTE:time_m2}|)%{GREEDYDATA:cmd},%{GREEDYDATA:pwd}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "@version", "host", "message", "_type", "_index", "_score" ]
   }
  }
  if "_grokparsefailure" in [tags] {
    grok {
      match => { "message" => "%{HOSTNAME:hostname},%{DATE:date},%{HOUR:time_h}:%{MINUTE:time_m},-%{WORD:duration},%{USER:user},%{USER:group}%{GREEDYDATA:cmd}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "@version", "host", "message", "_type", "_index", "_score" ]
  }
 }
}
output {
        if [type] == "rmlog" {
        elasticsearch {
                hosts => ["myhost.xyz.com:9200"]
                manage_template => false
                index => "pt-rmlog-%{+YYYY.MM.dd}"
  }
 }
}

注意:看来_grokparsefailure标记可在以下消息上使用,但在其他消息上仍然无法使用。

1)这个工作..
 ttmv541,19/05/21,03:43,-mins,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv541.373

ttmv540,19/05/21,03:59,00-hrs,USER,USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND,/local/ntr/ttmv540.373

2)文本日志的第二行失败,因为它具有关联的00-hrs号,现在在grok ..以下不能同时满足两个条件。
%{HOSTNAME:hostname},%{DATE:date},%{HOUR:time_h}:%{MINUTE:time_m},-%{WORD:duration},%{USER:user},%{USER:group}%{GREEDYDATA:cmd}

最佳答案

我将处理分为两部分,一部分处理主机名和时间戳问题,另一部分处理其余的行。我发现这使维护更加容易。

因此,您剩下了以下两个输入:

ttmv541,19/05/21,03:43,-mins
ttmv540,19/05/21,03:59,00-hrs

您的两种模式将与前几部分很好地匹配,所以问题是您想在时间之后解析出这些内容。在原始模式中,数字部分使用duration,单位使用hm。在第二种模式中,您似乎将单位放入duration,这可能不正确。

没有更多信息,持续时间似乎是可选的,但是您将始终拥有单位。这可以反射(reflect)在您的模式中,例如:
(%{NUMBER:duration})?-%{WORD:hm}

还要注意,如果最终需要多种模式,则不必依赖grokparsefailure来使用它们-match-> message可以采用数组。有关示例,请参见the doc

关于elasticsearch - 过滤数据时出现logstash grok问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56231597/

相关文章:

elasticsearch - 在Elasticsearch中查询数组中的部分匹配

elasticsearch - logstash将字符串转换为日期

csv - 从 CSV 文件输入数据到 logstash

elasticsearch - grok模式从日志消息中提取数据

elasticsearch - AND查询嵌套对象

elasticsearch - 限制较少的搜索不会在ElasticSearch中返回任何匹配

elasticsearch - Elasticsearch取代了Memcached

elasticsearch - 使用 Elasticsearch /kibana/logstash 集中 pm2 日志

elasticsearch - 如何覆盖来自 logstash 中的 json 的时间戳字段

elasticsearch - ELK堆栈中的新字段-Logstash