Fluentd:如何覆盖时间属性

标签 fluentd

Guyz……我们被困住了……救救我们! :-)

我们有一个使用 Fluentd 的 3 步日志聚合管道。

[#1 - 尾部日志(原始日志)] --(TCP)--> [#2 - 将读取的日志解析为 JSON] --(TCP)--> [#3 - 过滤并输出到 Redis & Mongo]

我们不会在第一步中将尾部日志转换为 JSON。这主要是因为我们希望避免该服务器上的任何额外 CPU 消耗。我们拥有的日志行非常复杂,并且有意将解析推迟到第 2 步(在不同的集群/服务器上)。

所以第 1 阶段发出:时间、标签和记录(原始日志行)。我们在这里使用 in_tail 插件,因此默认情况下 'time' 属性表示从文件中读取记录的时间。因此,在负载下读取时间可能与日志行的实际时间戳不匹配。

JSON 解析推迟到第二阶段。

在第二阶段,一旦我们将日志转换为 JSON……我们希望将阶段 #1 发送的“时间”属性覆盖为来自 JSON 记录的时间属性。

我们在步骤 #2 ( https://github.com/tagomoris/fluent-plugin-parser ) 中使用 Fluent-Plugin-Parser。

我们如何覆盖 time 属性并使 FluentD 使用它而不是在步骤 1 中读取的“时间”?

最佳答案

是的,您可以使用 fluent-plugin-parser 的未记录功能“time_key”来做到这一点,如下所示:

<source>
  type exec
  run_interval 3s
  format json
  command echo '{"message":"hello,2013-03-03 12:00:13"}'
  tag first
</source>

<match first>
  type parser
  key_name message
  time_key my_time
  time_format %Y-%m-%d %H:%M:%S
  format /^(?<some_field>[^,]*),(?<my_time>.*)/
  tag second
</match>

<match second>
  type stdout
</match>

上面的代码片段的作用是:
  • 生成消息 {"message":"hello,2013-03-03 12:00:13"}每 3 秒,标签为“first”。这是为了测试。
  • 它与 <match first> 匹配.然后,解析器插件使用正则表达式解析名为“message”的字段。在您的情况下,它将是 format json .
  • time_key my_time告诉解析器插件在“message”字段的解析值中寻找一个字段,如果存在,它用time_format %Y-%m-%d %H:%M:%S解析该字段。 . 从现在开始,这是新的时间
  • 最后,我输出到标准输出。

  • 如果你运行上面的 conf,你应该得到这样的输出:
    root@ae4a398d41ef:/home/fluentd# fluentd -c fluent.conf
    2014-05-31 00:01:19 +0000 [info]: starting fluentd-0.10.46
    2014-05-31 00:01:19 +0000 [info]: reading config file path="fluent.conf"
    2014-05-31 00:01:19 +0000 [info]: gem 'fluent-plugin-parser' version '0.3.4'
    2014-05-31 00:01:19 +0000 [info]: gem 'fluentd' version '0.10.46'
    2014-05-31 00:01:19 +0000 [info]: using configuration file: <ROOT>
      <source>
        type exec
        run_interval 3s
        format json
        command echo '{"message":"hello,2013-03-03 12:00:13"}'
        tag first
      </source>
      <match first>
        type parser
        key_name message
        time_key my_time
        time_format %Y-%m-%d %H:%M:%S
        format /^(?<some_field>[^,]*),(?<my_time>.*)/
        tag second
      </match>
      <match second>
        type stdout
      </match>
    </ROOT>
    2014-05-31 00:01:19 +0000 [info]: adding source type="exec"
    2014-05-31 00:01:19 +0000 [info]: adding match pattern="first" type="parser"
    2014-05-31 00:01:19 +0000 [info]: adding match pattern="second" type="stdout"
    2013-03-03 12:00:13 +0000 second: {"some_field":"hello"}
    2013-03-03 12:00:13 +0000 second: {"some_field":"hello"}
    2013-03-03 12:00:13 +0000 second: {"some_field":"hello"}
    2013-03-03 12:00:13 +0000 second: {"some_field":"hello"}
    

    关于Fluentd:如何覆盖时间属性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23958390/

    相关文章:

    logging - 将集中式日志记录添加到使用systemd的kubernetes集群

    fluentd - 如何在fluidd的输出路径中使用tag?

    fluentd:多个过滤器和匹配的一个来源

    java - 如何解析 fluidd 中输入的多行 java ERROR/Exception 堆栈跟踪(我应该通过 Kibana 看到相同的 ERROR/Exception 堆栈跟踪)

    elasticsearch - 如何在 kubernetes 中使用 fluentd 将不同的应用程序日志获取到 Elasticsearch

    elasticsearch - Fluentd 不会将任何数据刷新到 Elasticsearch - 但会在关闭时刷新

    docker - 由于权限问题,Fluentd 无法访问/var/lib/docker/containers 下的日志

    linux - Fluent Bit 中连续重试之间的时间

    kubernetes - fluentd-es-v1.22 DaemonSet 不创建任何 pod

    json - Kubernetes 使用转义引号将 JSON 日志保存到文件中。为什么?