elasticsearch - Elasticsearch 中的分组数据,两个键具有相同的值

标签 elasticsearch aggregation

我才刚刚开始学习 flex 搜索,并且面临群体聚合方面的问题。我有一个关于 flex 搜索的数据集,例如:

[{
    srcIP : "10.0.11.12",
    dstIP : "19.67.78.91",
    totalMB : "0.25"
},{
    srcIP : "10.45.11.62",
    dstIP : "19.67.78.91",
    totalMB : "0.50"
},{
    srcIP : "13.67.52.91",
    dstIP : "10.0.11.12",
    totalMB : "0.75"
},{
    srcIP : "10.23.64.12",
    dstIP : "10.45.11.62",
    totalMB : "0.25"
}]

我只想根据 srcIP 对数据进行分组,并对字段 totalMB 求和,但我只想添加更多信息,例如在scrIP上进行分组时,它将 srcIP 值与 dstIP 匹配并对 dstIP 总MB 求和。

输出应该是这样的:
buckets : [{
    key : "10.0.11.12",
    total_GB_SrcIp :{
        value : "0.25"
    },
    total_GB_dstIP :{
        value : "0.75"
    }
},
{
    key : "10.45.11.62",
    total_MB_SrcIp :{
        value : "0.50"
    },
    total_MB_dstIP :{
        value : "0.25"
    }
}]

我已经完成了一个键的常规聚合,但是没有得到针对我的问题的最终查询。
查询:
GET /index*/_search
{
    size : 0,
    "aggs": {
        "group_by_srcIP": {
          "terms": {
        "field": "srcIP",
        "size": 100,
        "order": {
          "total_MB_SrcIp": "desc"
        }
          },
          "aggs": {
        "total_MB_SrcIp": {
          "sum": {
            "field": "TotalMB"
          }
        }
          }
    }
  }
}

希望您根据示例输出了解我的问题。
提前致谢。

最佳答案

根据我的理解,您需要针对两个其他字段(srcIP,dstIP)中的不同值在字段(totalMB)上进行汇总汇总。

AFAIK, flex 搜索不适用于聚合多个字段的值,除非您使用某些文档ingestion将这些字段组合在一起或在应用程序端本身将其组合。 (不过,我在这里可能错了)。

我尝试使用scripted_metric聚合来获取所需的输出。 (如果您不知道它是什么或它如何工作,请阅读它)

我尝试了无痛脚本,以进行以下汇总操作:

  • 从每个文档
  • 中选择srcIp,dstIp和totalMB
  • 在 map
  • 中填充像IP-> {(src:totalMBs),(dst:totalMBs)}这样的跨映射
  • 作为聚合
  • 的结果返回此 map

    这是带有聚合的实际搜索查询:
    GET /testIndex/testType/_search
    {
      "size": 0,
      "aggs": {
        "ip-addr": {
          "scripted_metric": {
            "init_script": "params._agg.addrs = []",
            "map_script": "def lst = []; lst.add(doc.srcIP.value); lst.add(doc.dstIP.value); lst.add(doc.totalMB.value); params._agg.addrs.add(lst);",
            "combine_script": "Map ipMap = new HashMap(); for(entry in params._agg.addrs) { def srcIp = entry.get(0); def dstIp = entry.get(1); def mbs = entry.get(2); if(ipMap.containsKey(srcIp)) {def srcMbSum = mbs + ipMap.get(srcIp).get('srcMB'); ipMap.get(srcIp).put('srcMB',srcMbSum); } else {Map types = new HashMap(); types.put('srcMB', mbs); types.put('dstMB', 0.0); ipMap.put(srcIp, types); } if(ipMap.containsKey(dstIp)) {def dstMbSum = mbs + ipMap.get(dstIp).get('dstMB'); ipMap.get(dstIp).put('dstMB',dstMbSum); } else {Map types = new HashMap(); types.put('srcMB', 0.0); types.put('dstMB', mbs); ipMap.put(dstIp, types); } } return ipMap;",
            "reduce_script": "Map resultMap = new HashMap(); for(ipMap in params._aggs) {for(entry in ipMap.entrySet()) {def ip = entry.getKey(); def srcDestMap = entry.getValue(); if(resultMap.containsKey(ip)) {Map types = new HashMap(); types.put('srcMB', srcDestMap.get('srcMB') + resultMap.get(ip).get('srcMB')); types.put('dstMB', srcDestMap.get('dstMB') + resultMap.get(ip).get('dstMB')); resultMap.put(ip, types); } else {resultMap.put(ip, srcDestMap); } } } return resultMap;"
          }
        }
      }
    }
    

    以下是实验详细信息:

    索引映射:
    GET testIndex/_mapping
    {
      "testIndex": {
        "mappings": {
          "testType": {
            "dynamic": "true",
            "_all": {
              "enabled": false
            },
            "properties": {
              "dstIP": {
                "type": "ip"
              },
              "srcIP": {
                "type": "ip"
              },
              "totalMB": {
                "type": "double"
              }
            }
          }
        }
      }
    }
    

    样本输入:
    POST testIndex/testType
    {
        "srcIP" : "10.0.11.12",
        "dstIP" : "19.67.78.91",
        "totalMB" : "0.25"
    }
    
    POST testIndex/testType
    {
        "srcIP" : "10.45.11.62",
        "dstIP" : "19.67.78.91",
        "totalMB" : "0.50"
    }
    
    POST testIndex/testType
    {
        "srcIP" : "13.67.52.91",
        "dstIP" : "10.0.11.12",
        "totalMB" : "0.75"
    }
    
    POST testIndex/testType
    {
        "srcIP" : "10.23.64.12",
        "dstIP" : "10.45.11.62",
        "totalMB" : "0.25"
    }
    

    查询输出:
    {
      "took": 3,
      "timed_out": false,
      "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": 4,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "ip-addr": {
          "value": {
            "13.67.52.91": {
              "srcMB": 0.75,
              "dstMB": 0
            },
            "10.23.64.12": {
              "srcMB": 0.25,
              "dstMB": 0
            },
            "10.45.11.62": {
              "srcMB": 0.5,
              "dstMB": 0.25
            },
            "19.67.78.91": {
              "srcMB": 0,
              "dstMB": 0.75
            },
            "10.0.11.12": {
              "srcMB": 0.25,
              "dstMB": 0.75
            }
          }
        }
      }
    }
    

    这是可读查询,可帮助您更好地理解。
    "scripted_metric": {
      "init_script": "params._agg.addrs = []",
      "map_script": """
          def lst = [];
          lst.add(doc.srcIP.value);
          lst.add(doc.dstIP.value);
          lst.add(doc.totalMB.value);
          params._agg.addrs.add(lst);
        """,
      "combine_script": """
          Map ipMap = new HashMap();
          for(entry in params._agg.addrs) { 
            def srcIp = entry.get(0);
            def dstIp = entry.get(1);
            def mbs = entry.get(2);
    
            if(ipMap.containsKey(srcIp)) { 
              def srcMbSum = mbs + ipMap.get(srcIp).get('srcMB');
              ipMap.get(srcIp).put('srcMB',srcMbSum);
            } else {
              Map types = new HashMap();
              types.put('srcMB', mbs);
              types.put('dstMB', 0.0);
              ipMap.put(srcIp, types);
            }
    
            if(ipMap.containsKey(dstIp)) {
              def dstMbSum = mbs + ipMap.get(dstIp).get('dstMB');
              ipMap.get(dstIp).put('dstMB',dstMbSum);
            } else {
              Map types = new HashMap();
              types.put('srcMB', 0.0);
              types.put('dstMB', mbs);
              ipMap.put(dstIp, types);
            }
          }
          return ipMap;
        """,
      "reduce_script": """
          Map resultMap = new HashMap();
          for(ipMap in params._aggs) {
            for(entry in ipMap.entrySet()) {
              def ip = entry.getKey();
              def srcDestMap = entry.getValue();
    
              if(resultMap.containsKey(ip)) {
                Map types = new HashMap();
                types.put('srcMB', srcDestMap.get('srcMB') + resultMap.get(ip).get('srcMB'));
                types.put('dstMB', srcDestMap.get('dstMB') + resultMap.get(ip).get('dstMB'));
                resultMap.put(ip, types);
              } else {
                resultMap.put(ip, srcDestMap);
              } 
            } 
          }
          return resultMap;
        """
    }
    

    但是,在深入探讨之前,建议您对一些样本数据进行测试,然后检查其是否有效。脚本化度量标准聚合确实对查询性能有相当大的影响。

    还有一件事,要获得聚合结果中必需的键字符串,请根据需要将脚本中所有出现的'srcMB'和'dstMB'替换为' total_GB_SrcIp '和' total_GB_DstIp '。

    希望这可以帮助您或某个人。

    仅供引用,我在ES v5.6.11上对此进行了测试。

    关于elasticsearch - Elasticsearch 中的分组数据,两个键具有相同的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57756088/

    相关文章:

    elasticsearch - 如何在elasticsearch中配置discovery.type?

    MongoDB 以并行阶段展开或组合

    java - UML 图帮助(聚合/组合)

    java - 如何用java编写elasticsearch查询聚合?

    Java - 确保集合中的项目只能使用一次

    node.js - 与elasticsearch连接的node.js客户端中的Econn重置和堆内存空间不足

    elasticsearch - Elastic Search 是否支持 ACID 属性

    elasticsearch - 为单个 Elasticsearch 索引维护两个别名的用途是什么

    java - Java进程内存大小和堆大小的区别

    python - 通过动态键名对字典进行分组并聚合Python中嵌套字典的一些键