ruby - 将压缩的字节字符串从protobuf转换为通过Logstash格式化的字节

标签 ruby elasticsearch logstash

我有一个protobuf,我正在尝试通过logstash进行喂食。一些值看起来是二进制格式的:

通过rubydebug编解码器打印时

我得到的值(value)是:

rData => '\xD8:\xC9$'

这似乎是因为protobuf模板具有:
optional :bytes, :rData, 5

哪个...会在锡盒上说什么-将原始字节传递给logstash,然后假定它们为文本。

因此,该rData行将解码为216.58.201.36,这是www.google.com的响应。

如何在Elasticsearch中将其转换为有用的格式?

最佳答案

为了做到这一点,您将需要自己编写一些 ruby 。

但是不用担心,它比听起来容易,您可以将ruby内联到logstash配置文件中。

第一:

  • 将输出编解码器设置为rubydebug,以便它打印数据结构-您将需要此代码来标识所需的字段。

  • 因此,对于我的示例(为简洁起见):

    我们对rData字段感兴趣。
    "socketProtocol" => 1,
        "@timestamp" => 2017-12-12T10:26:41.910Z,
       "requestorId" => "",
              "port" => 47788,
          "response" => {
                "rcode" => 0,
                  "rrs" => [
            [0] {
                 "rType" => 1,
                 "rData" => "\xD8:\xC9$",
                "rClass" => 1,
                 "rName" => "www.google.com.",
                  "rTtl" => 300
            }
        ],
    

    我们还有一些更长的rData字段的示例,其中rubydebug的值为:
    "rData" => "*\x00\x14P@\t\b\v\x00\x00\x00\x00\x00\x00 \x04",
    

    最终呈现为Elasticsearch:
    "rData": "*\u0000\u0014P@\t\b\u000b\u0000\u0000\u0000\u0000\u0000\u0000 \u0004",
    

    因此,我们使用event.get("response")提取了此内容,以便我们测试其存在(必要,因为在我的用例中,将有没有数据的response字段):
    filter {
      if [response] {
        ruby {
          code => 
    
            # response rData can be a different things.
            #usually an ipV4 address, or an ipv6. 
            #But they're usually written in different formats - ipv4 is dotted quads,
            #where ipv6 is hex and double-bytes
            #so we look at the (unpacked) string length, and see if there are 4 (or more) 'uint64s' in there. 
            #and substitute accordingly. 
            '
            response = event.get("response")
            if ( response and response["rrs"] and response["rrs"][0] and response["rrs"][0]["rData"] ) 
               rdata = response["rrs"][0]["rData"]
               hex_value = rdata.unpack("H*").join("")
               ip_value =  rdata.unpack("C4").join(".")
               length_rdata =    rdata.unpack("L*").length
               if ( length_rdata >= 4 ) 
                 event.set("[response][decoded_rdata]", hex_value )
               else
                 event.set("[response][decoded_rdata]", ip_value) 
               end
            end
           '
        }
      }
    } 
    

    注意-这会测试rData值的长度,如果它是“long”,则假定它是一个ipv6地址,格式为十六进制,如果它很短,则假定其为ipv4,格式为常规的“点分”四边形。

    然后,它将添加到一个新的子字段response.decoded_rdata,它对 Elasticsearch 可能比嵌套任何更深的字段更有用。

    我们还提供了一个附加代码段来处理from / to / messageId字段的“字节”编码,该编码在很大程度上与以下代码相似:
      ruby {
        code =>  
           #take to and from fields, and assume they're packed IP addresses. 
           #take messageId and convert to hex. 
           'event.set("from", event.get("from").unpack("C4").join("."));
            event.set("to", event.get("to").unpack("C4").join("."));
            event.set("messageId", event.get("messageId").unpack("H*").join(""));
           '
      }
    

    关于ruby - 将压缩的字节字符串从protobuf转换为通过Logstash格式化的字节,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47770696/

    相关文章:

    mysql - Ubuntu 上未初始化常量 MysqlCompat::MysqlRes (mysql gem 错误),未修复

    elasticsearch - 使用 geo_distance 过滤器不会返回任何使用 ElasticSearch 的命中

    apache-spark - Elasticsearch有效的索引管理

    elasticsearch - 在 Logstash 中定义多个输出,同时处理 Elasticsearch 实例的潜在不可用性

    elasticsearch - 如何删除json。我的Elasticsearch字段上的前缀

    ruby-on-rails - ruby::Module 或者只是 Module

    ruby-on-rails - rails : I installed ActiveAdmin and my devise link stopped working

    ruby - 迭代循环访问两个元素(如果存在)

    elasticsearch - Elasticsearch:在字段中获取具有特定值的文档

    sql - 如何在logstash中将数据库版本保存为sql_last_version变量