elasticsearch - Kibana在使用时间戳和Received_at选择时未显示正确的数据

标签 elasticsearch logstash elastic-stack logstash-configuration kibana-6

我在logstash.conf文件下面,我看到数据正在正确处理,但是今天我看到了很奇怪的问题,其中noi-syslog的索引未显示正确的syslog_timestamp

input {
  file {
    path => [ "/scratch/rsyslog/*/messages.log" ]
    start_position => beginning
    sincedb_path => "/dev/null"
    max_open_files => 64000
    type => "noi-syslog"
  }
  file {
    path => [ "/scratch/rsyslog_CISCO/*/network.log" ]
    start_position => beginning
    sincedb_path => "/dev/null"
    max_open_files => 64000
    type => "apic_logs"
  }
}

filter {
  if [type] == "noi-syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp } %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "host", "path" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
 }
}
  if [type] == "apic_logs" {
    grok {
      match => { "message" => "%{CISCOTIMESTAMP:syslog_timestamp} %{CISCOTIMESTAMP} %{SYSLOGHOST:syslog_hostname} (?<prog>[\w._/%-]+) %{SYSLOG5424SD:fault_code}%{SYSLOG5424SD:fault_state}%{SYSLOG5424SD:crit_info}%{SYSLOG5424SD:log_severity}%{SYSLOG5424SD:log_info} %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "host", "path" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
  }
 }
}
output {
        if [type] == "noi-syslog" {
        elasticsearch {
                hosts => "noida-elk:9200"
                manage_template => false
                index => "noi-syslog-%{+YYYY.MM.dd}"
                document_type => "messages"
  }
 }
}

output {
        if [type] == "apic_logs" {
        elasticsearch {
                hosts => "noida-elk:9200"
                manage_template => false
                index => "apic_logs-%{+YYYY.MM.dd}"
                document_type => "messages"
  }
 }
}
apic_logsnoi-syslog的指标:
$ curl -s -XGET http://127.0.0.1:9200/_cat/indices?v |  grep apic_logs
green  open   noi-syslog-2019.03.13           Fz1Rht65QDCYCshmSjWO4Q   5   1    6845696            0      2.2gb            1gb
green  open   noi-rmlog-2019.03.13            W_VW8Y1eTWq-TKHAma3DLg   5   1     148613            0     92.6mb           45mb
green  open   apic_logs-2019.03.13            pKz61TS5Q-W2yCsCtrVvcQ   5   1    1606765            0    788.6mb        389.7mb

Kibana页面在使用@timesatmp选择apic_logs索引时正确显示了所有字段,但是对于Linux系统日志索引noi-syslog却无法正常工作。

enter image description here
noi-syslog在使用@timestamp选择时未显示所有字段,但显示了_grokparsefailure标签,另一个事实是,当为同一received_at选择noi-syslog时,其显示所有字段,但未显示及时数据。

下图显示了使用received_at选择的图像

enter image description here

下图显示了使用@timestamp选择的图像

enter image description here

在logstash日志中:
# tail -5 log-cohort_deprecation.log
[2019-03-13T20:16:29,112][WARN ][o.e.d.a.a.i.t.p.PutIndexTemplateRequest] [noida-elk.cadence.com] Deprecated field [template] used, replaced by [index_patterns]
[2019-03-13T20:16:30,548][WARN ][o.e.d.a.a.i.t.p.PutIndexTemplateRequest] [noida-elk.cadence.com] Deprecated field [template] used, replaced by [index_patterns]
[2019-03-13T20:19:45,935][WARN ][o.e.d.a.a.i.t.p.PutIndexTemplateRequest] [noida-elk.cadence.com] Deprecated field [template] used, replaced by [index_patterns]
[2019-03-13T20:19:48,644][WARN ][o.e.d.a.a.i.t.p.PutIndexTemplateRequest] [noida-elk.cadence.com] Deprecated field [template] used, replaced by [index_patterns]
[2019-03-13T20:20:13,069][WARN ][o.e.d.a.a.i.t.p.PutIndexTemplateRequest] [noida-elk.cadence.com] Deprecated field [template] used, replaced by [index_patterns]

系统上的内存使用情况:
             total       used       free     shared    buffers     cached
Mem:         32057      31794        263          0        210      18206
-/+ buffers/cache:      13378      18679
Swap:       102399        115     102284

总内存32GB我已为每个Elastic&Logstash分配了8GB,我怀疑这是否是导致问题的原因。

删除grokparsefailure标记的解决方法:
filter {
  if [type] == "noi-syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "host", "path" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
 }
 if "_grokparsefailure" in [tags] {
         drop { }
 }
}

1-或替代只是一个想法..
filter {
  if [type] == "noi-syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "host", "path" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
 }
  if "_grokparsefailure" in [tags] {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp } %{SYSLOGHOST:syslog_hostname} %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "host", "path" ]
   }
  }     
 }
}

2-或另一种选择只是一个想法。
filter {
  if [type] == "noi-syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "host", "path" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
 }
  elif "_grokparsefailure" in [tags] {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp } %{SYSLOGHOST:syslog_hostname} %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "host", "path" ]
   }
   else "_grokparsefailure" in [tags] {
         drop { }
  }
 }

最佳答案

这里的问题是示例中的消息类型为noi-syslog彼此不同,并且您的grok过滤器仅适用于第一个消息,当grok无法解析消息时,它会添加一个名为_grokparsefailure的标记。

您对grok起作用的第一个示例是:

Mar 13 15:55:02 hostname /usr/bin/crontab[32708]: (root) LIST (root)

使grok失败的第二个示例是:
Mar 12 11:01:02 hostname run-parts(/etc/cron.hourly)[3970 starting mcelog.cron

这第二条消息是错误的,它在PID ]之后缺少右括号(:)和冒号(3970),因此您的grok模式不起作用。

由于grok失败,因此syslog_timestamp字段不存在,因此date过滤器无关,并且@timestamp将设置为事件进入Logstash管道的时间。

您需要对每种消息都有一个grok模式,一种更正syslog_timestamp的快速方法是捕获失败的消息,并应用另一个grok过滤器以获取syslog_timestamp字段,并将消息的其余部分放在另一个字段中。

尝试将以下条件添加到管道中。
if "_grokparsefailure" in [tags] {
  grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp } %{SYSLOGHOST:syslog_hostname} %{GREEDYDATA:rest_of_syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      remove_field => [ "host", "path" ]
   } 
}

grok的结果将如下所示:
{
  "syslog_hostname": "hostname",
  "syslog_timestamp": "Mar 12 11:01:02",
  "rest_of_syslog_message": "run-parts(/etc/cron.hourly)[3970 starting mcelog.cron"
}

关于elasticsearch - Kibana在使用时间戳和Received_at选择时未显示正确的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55145787/

相关文章:

Elasticsearch 附件插件 vs 自己的 tika 实现

elasticsearch - 在 Elasticsearch 中包含查询或部分匹配

elasticsearch - 如何在Elastic Search的过滤器聚合中引用多个嵌套级别?

elasticsearch - 使用NEST的日期范围搜索不会返回数据

elasticsearch - Elasticsearch脚本参数containsAll与文档中的值

node.js - ElasticSearch是否可以仅显示聚合?

elasticsearch - 从远程服务器将数据传递到Elasticsearch

elasticsearch - grok 不读带连字符的单词

logstash - 在 Logstash 中提取字段

elasticsearch - 平均和按查询ElasticSearch分组