elasticsearch - 如何通过 LogStash 过滤简单消息到 ElasticSearch 将消息划分为多个字段

标签 elasticsearch logstash logstash-grok logstash-configuration logstash-file

这是输入文件:

{"meta":"","level":"error","message":"clientErrorHandler: Erro não previsto ou mapeado durante chamada dos serviços.","timestamp":"2017-04-06T16:08:37.861Z"}
{"meta":"","level":"error","message":"clientErrorHandler: Erro não previsto ou mapeado durante chamada dos serviços.","timestamp":"2017-04-06T19:40:17.682Z"}

基本上,这样的日志是我的 NodeJs 应用程序通过 Winstom 模块的结果。我的疑问集中在如何调整 logstash 过滤器以获得在 ElasticSearch 中创建的 4 个字段。

我的意图是查看“列”(我猜在 ElasticSearch 上下文中属性或文件可能是更好的词):级别(例如错误)、消息源(例如 clientErrorHandler)、消息内容(例如 Erro não ...serviços)和错误时间没有纳秒(例如 2017-04-06T19:40:17)。

我被困在这一点上:

1 - 我使用了这个 logstash.conf
input {
    file {
         path => "/home/demetrio/dev/testes_manuais/ELK/logs/*"
         start_position => "beginning"

   }
}

filter {

  grok {
        match => {
        "message" => '%{SYSLOG5424SD:loglevel} %{TIMESTAMP_ISO8601:Date} %{GREEDYDATA:content}'
      }
  }

  date {
    match => [ "Date", "YYYY-mm-dd HH:mm:ss.SSS" ]
    locale => en
  }

}

output {
  stdout {
    codec => plain {
                        charset => "ISO-8859-1"
                }

    }
    elasticsearch {
        hosts => "http://127.0.0.1:9200"
        index => "dmz-logs-indice"

  }
}

2 - 通过 Kibana DevTools 搜索 ElasticSearch
GET _search
{
  "query": {
    "match_all": {}
  }
}

我看到了:
{
  "took": 5,
  "timed_out": false,
  "_shards": {
    "total": 6,
    "successful": 6,
    "failed": 0
  },
  "hits": {
    "total": 3,
    "max_score": 1,
    "hits": [
      {
        "_index": ".kibana",
        "_type": "config",
        "_id": "5.3.0",
        "_score": 1,
        "_source": {
          "buildNum": 14823
        }
      },
      {
        "_index": "dmz-logs-indice",
        "_type": "logs",
        "_id": "AVtJLZ5x6gscWn5fxxA_",
        "_score": 1,
        "_source": {
          "path": "/home/demetrio/dev/testes_manuais/ELK/logs/logs.log",
          "@timestamp": "2017-04-07T16:09:36.996Z",
          "@version": "1",
          "host": "nodejs",
          "message": """{"meta":"","level":"error","message":"clientErrorHandler: Erro não previsto ou mapeado durante chamada dos serviços.","timestamp":"2017-04-06T16:08:37.861Z"}""",
          "tags": [
            "_grokparsefailure"
          ]
        }
      },
      {
        "_index": "dmz-logs-indice",
        "_type": "logs",
        "_id": "AVtJLZ5x6gscWn5fxxBA",
        "_score": 1,
        "_source": {
          "path": "/home/demetrio/dev/testes_manuais/ELK/logs/logs.log",
          "@timestamp": "2017-04-07T16:09:36.998Z",
          "@version": "1",
          "host": "nodejs",
          "message": """{"meta":"","level":"error","message":"clientErrorHandler: Erro não previsto ou mapeado durante chamada dos serviços.","timestamp":"2017-04-06T19:40:17.682Z"}""",
          "tags": [
            "_grokparsefailure"
          ]
        }
      }
    ]
  }
}

我想我应该使用一些 RegularExpresss 或 Grok 来分成四个和平:

1 级
2 - 带有“:”之前的消息
3 - 带有“:”之后的消息
4 - 时间戳

并且,如果可能,提供更好的列(字段/属性)标签,例如:

1 级
2 - 消息源
3 - 消息内容
4 - 错误时间

最后删除时间戳纳秒

PS。以防万一将来的读者对我如何登录 NodeJs 感兴趣,您可以在这里:

...
var winston = require('winston');
winston.emitErrs = true;

var logger = new winston.Logger({
    transports: [
        new winston.transports.File({
            level: 'error',
            filename: './logs/logs.log',
            handleExceptions: true,
            json: true,
            maxsize: 5242880, //5MB
            maxFiles: 5,
            colorize: false,
            prettyPrint: true
        })               
    ],
    exitOnError: false
});

...

function clientErrorHandler(err, req, res, next) {
      logger.log("error","clientErrorHandler: Erro não previsto ou mapeado durante chamada dos serviços.",err.message);

      res.send(500, { error: 'Erro genérico!' });

  }

app.use(clientErrorHandler);

PS2:我仔细阅读了Filter specific Message with logstash before sending to ElasticSearch之类的问题但我真的被困住了

最佳答案

由于您的应用程序将日志输出为 JSON 字符串,因此您可以配置 Logstash 以将日志解析为 JSON。这就像添加 codec => "json" 一样简单进入文件输入配置。

以下是您的方案的示例配置:

input {
  file {
    path => "/home/demetrio/dev/testes_manuais/ELK/logs/*"
    start_position => "beginning"
    codec => "json"
  }
}

filter {
  # This matches `timestamp` field into `@timestamp` field for Kibana to consume.
  date {
    match => [ "timestamp", "ISO8601" ]
    remove_field => [ "timestamp" ]
  }
}

output {
  stdout {
    # This codec gives your more details about the event.
    codec => rubydebug
  }

  elasticsearch {
    hosts => "http://127.0.0.1:9200"
    index => "dmz-logs-indice"
  }
}

这是样本 stdout来自 Logstash:
{
          "path" => "/home/demetrio/dev/testes_manuais/ELK/logs/demo.log",
    "@timestamp" => 2017-04-06T19:40:17.682Z,
         "level" => "error",
          "meta" => "",
      "@version" => "1",
          "host" => "dbf718c4b8e4",
       "message" => "clientErrorHandler: Erro não previsto ou mapeado durante chamada dos serviços.",
}

关于elasticsearch - 如何通过 LogStash 过滤简单消息到 ElasticSearch 将消息划分为多个字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43283214/

相关文章:

elasticsearch 日期范围查询 0 次点击

elasticsearch - Logstash读取文件/文档

elasticsearch - 如何在logstash中基于grok创建过滤器

regex - 使用Logstash解析范围请求 header

elasticsearch - stormcrawler:indexer.md.mapping-如果元数据标记不存在会发生什么?

elasticsearch - 使用不同的过滤器和编解码器将多个kafka主题输入到logstash

amazon-web-services - 从 AWS mysql RDS 获取数据到 AWS Elasticsearch 的最简单方法?

elasticsearch - 使用grok过滤Logstash中的Apache错误日志

elasticsearch - Logstash上的单独索引

elasticsearch - Logstash 根据现有字段创建新字段