elasticsearch - 使用 Logstash 计算特定日志事件的持续时间

标签 elasticsearch timestamp logstash kibana

目标:我想使用logstash计算特定事件持续的时间。

场景:假设一位客户正在我的页面上搜索要购买的产品。他访问的每个页面及其持续时间都记录在日志中。现在我想了解普通客户需要多长时间才能获得产品。以及我的服务器需要多长时间才能回复他。

现在这是我的日志文件:
16-09-2004 00:37:22 BEGIN_CUST 时间:16-09-2004T00:37:26+05:30 ID-XYZ456 2004 年 9 月 16 日 00:37:23 PAGE_1 ID-XYZ456 时间:16-09-2004T00:39:26+05:30 2004 年 9 月 16 日 00:37:23 PAGE_2 ID-XYZ456 时间:16-09-2004T00:41:26+05:30 2004 年 9 月 16 日 00:37:23 但请求 ID-XYZ456 时间:16-09-2004T00:43:26+05:30 16-09-2004 00:37:23 购买 ID-XYZ456 时间:16-09-2004T00:47:26+05:30 16-09-2004 00:51:22 BEGIN_CUST 时间:16-09-2004T00:52:26+05:30 ID-YUB98I 2004 年 9 月 16 日 00:53:23 PAGE_1 ID-YUB98I 16-09-2004 00:55:23 购买 ID-YUB98I

在上面的日志文件中,很明显,BEGIN_CUST 是事件的开始,PURCHASE 是事件的结束。
ID(充当每个客户的唯一ID)。

我尝试过脚本化字段。但由于以下几点,它没有给我带来正确的结果,

  • 客户不一定需要购买它。
  • 客户购买甚至可能会持续几秒钟。

是否有更好的方法可以在 Kibana 的单独字段中绘制个人客户的持续时间,以便使用 Logstash 对其进行可视化。

提前致谢。

最佳答案

只要您使用 ElasticSearch 作为您的商店,the elasticsearch filter可能会做你需要的事情。诀窍是在收到 PURCHASE 事件后立即搜索 BEGIN_CUST 事件。该插件的文档包括一个示例,可以完成您正在寻找的大部分内容,但这里是一个摘要:

if [trans_type] == "PURCHASE" {
  elasticsearch {
    hosts => localhost,
    query => "trans_type:BEGIN_CUST AND cust_id:%{[cust_id]}],
    fields => { "@timestamp" => "started" }
  }
  date {
    match => [ "[started]", "ISO8601" ]
    target => "[started]"
  }
  ruby {
    code => "event['shopping_time'] = (event['@timestamp'] - event['started'] rescue nil"
  }
}

这将产生一个 shopping_time 字段,以秒为单位测量 BEGIN_CUST 记录到达时和第一个 PURCHASE 到达之间的时间。如果客户购买两次,则每条 PURCHASE 记录都将拥有基于相同 BEGIN_CUST 的自己的 shopping_time 字段。

其工作原理是在 ElasticSearch 中查询 BEGIN_CUST 记录,并在 PURCHASE 记录的 started 字段中使用该记录的 @timestamp 数据。然后,date {} 过滤器将其转换为日期时间数据类型。最后,ruby {} block 计算当前 @timestamp 字段与从 ElasticSearch 中提取的字段之间的时间差,从而创建 shopping_time字段。

关于elasticsearch - 使用 Logstash 计算特定日志事件的持续时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44757545/

相关文章:

Elasticsearch - 没有可用的节点

elasticsearch - 如何在Java的Elasticsearch中获取文档的父ID?

java - 有没有办法用 Java 中的 TimeStamp 获取始终为 2 位数字的值?

mysql - 为什么在 DEFAULT 子句中只能有一个带有 CURRENT_TIMESTAMP 的 TIMESTAMP 列?

elasticsearch - Logstash与模式不匹配

lucene - 如何将文档索引到特定的ElasticSearch分片?

c# - Serilog + ElasticSearch Sink 连接问题(.net core)

apache-spark - Apache Spark 从时间戳列中减去天数

ruby - ruby 代码Logstash中的功能

elasticsearch - 使用 Kafka 和 ELK 堆栈进行集中式日志记录