elasticsearch - 通过子聚合进行Elasticsearch流水线

标签 elasticsearch aggregation pipeline

我正在尝试通过Elasticsearch 2.1中的子聚合来汇总数据。

通过流水线,我试图在子级父级上汇总子级子级数据:

{
  "query": {
    "match_all": {}
  },
  "aggs": {
    "unit": {
      "terms": {
        "size": 500,
        "field": "unit_id"
      },
      "aggs": {
        "total_active_ministers_by_unit": {
          "sum_bucket": {
            "buckets_path": "ministers>active_minister_by_ministry.value"
          }
        },
        "ministers": {
          "children": {
            "type": "member_ministry"
          },
          "aggs": {
            "active_minister_by_ministry": {
              "sum_bucket": {
                "buckets_path": "ministry>active_minister._count"
              }
            },
            "ministry": {
              "terms": {
                "field": "member_version",
                "size": 1,
                "order": {
                  "_term": "desc"
                }
              },
              "aggs": {
                "active_minister": {
                  "filter": {
                    "range": {
                      "ministry_date_start": {
                        "lte": "now/d",
                        "gte": "now-96M/M"
                      }
                    }
                  }
                },
                "ministry_type": {
                  "terms": {
                    "field": "ministry"
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

触发查询后,我从Rest API得到以下回答:
{
  "error": {
    "root_cause": [

    ],
    "type": "reduce_search_phase_exception",
    "reason": "[reduce] ",
    "phase": "fetch",
    "grouped": true,
    "failed_shards": [

    ],
    "caused_by": {
      "type": "illegal_argument_exception",
      "reason": "Cannot find an aggregation named [value] in [active_minister]"
    }
  },
  "status": 503
}

Elasticsearch Server控制台为我提供以下内容:
[2015-12-03 12:19:57,974][INFO ][rest.suppressed          ] /unit_member/unit/_search Params: {index=unit_member, type=unit}
Failed to execute phase [fetch], [reduce] 
    at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction$2.onFailure(TransportSearchQueryThenFetchAction.java:162)
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Cannot find an aggregation named [value] in [active_minister]
    at org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation.getProperty(InternalSingleBucketAggregation.java:111)
    at org.elasticsearch.search.aggregations.InternalMultiBucketAggregation$InternalBucket.getProperty(InternalMultiBucketAggregation.java:98)
    at org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue(BucketHelpers.java:160)
    at org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator.doReduce(BucketMetricsPipelineAggregator.java:74)
    at org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator.reduce(SiblingPipelineAggregator.java:68)
    at org.elasticsearch.search.aggregations.InternalAggregation.reduce(InternalAggregation.java:155)
    at org.elasticsearch.search.aggregations.InternalAggregations.reduce(InternalAggregations.java:170)
    at org.elasticsearch.search.aggregations.bucket.terms.InternalTerms$Bucket.reduce(InternalTerms.java:110)
    at org.elasticsearch.search.aggregations.bucket.terms.InternalTerms.doReduce(InternalTerms.java:220)
    at org.elasticsearch.search.aggregations.InternalAggregation.reduce(InternalAggregation.java:153)
    at org.elasticsearch.search.aggregations.InternalAggregations.reduce(InternalAggregations.java:170)
    at org.elasticsearch.search.controller.SearchPhaseController.merge(SearchPhaseController.java:409)
    at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction$2.doRun(TransportSearchQueryThenFetchAction.java:149)
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
    ... 3 more

这是应该报告的错误吗?还是我以错误的方式处理管道?

这是我的映射:
"member_ministry": {
    "dynamic": "false",
    "_routing": {
      "required": true
    },
    "properties": {
      "date_ended": {
        "ignore_malformed": true,
        "format": "yyyy-MM-dd",
        "type": "date"
      },
      "birthday": {
        "ignore_malformed": true,
        "format": "yyyy-MM-dd",
        "type": "date"
      },
      "parent_path": {
        "index": "not_analyzed",
        "type": "string"
      },
      "forename": {
        "type": "string"
      },
      "ministry": {
        "index": "not_analyzed",
        "store": true,
        "type": "string"
      },
      "lastname": {
        "type": "string"
      },
      "unit_id": {
        "index": "not_analyzed",
        "store": true,
        "type": "string"
      },
      "ministry_date_start": {
        "ignore_malformed": true,
        "format": "yyyy-MM-dd",
        "type": "date"
      },
      "date_created": {
        "ignore_malformed": true,
        "format": "yyyy-MM-dd",
        "type": "date"
      },
      "ministry_date_end": {
        "ignore_malformed": true,
        "format": "yyyy-MM-dd",
        "type": "date"
      },
      "sealing_date": {
        "ignore_malformed": true,
        "format": "yyyy-MM-dd",
        "type": "date"
      },
      "ministry_unit_ids": {
        "index": "not_analyzed",
        "type": "string"
      },
      "member_id": {
        "index": "not_analyzed",
        "store": true,
        "type": "string"
      },
      "version_state": {
        "index": "not_analyzed",
        "type": "string"
      },
      "member_ministry_id": {
        "index": "not_analyzed",
        "type": "string"
      },
      "gender": {
        "index": "not_analyzed",
        "store": true,
        "type": "string"
      },
      "member_version": {
        "store": true,
        "type": "integer"
      },
      "client_id": {
        "index": "not_analyzed",
        "store": true,
        "type": "string"
      },
      "primary_ministry_id": {
        "index": "not_analyzed",
        "type": "string"
      }
    },
    "_parent": {
      "type": "unit"
    }
  },
  "unit": {
    "dynamic": "false",
    "properties": {
      "versions": {
        "properties": {
          "apostle_district_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "date_ended": {
            "ignore_malformed": true,
            "format": "yyyy-MM-dd",
            "type": "date"
          },
          "unit_type_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "parent_path": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "sub_district_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "district_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "unit_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "date_created": {
            "ignore_malformed": true,
            "format": "yyyy-MM-dd",
            "type": "date"
          },
          "bishop_district_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "administration_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "date_opened": {
            "ignore_malformed": true,
            "format": "yyyy-MM-dd",
            "type": "date"
          },
          "district_church_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "district_apostle_helper_area_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "international_church_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "unit_type": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "district_apostle_area_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "unit_version": {
            "store": true,
            "type": "integer"
          },
          "client_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          },
          "regional_administration_center_id": {
            "index": "not_analyzed",
            "store": true,
            "type": "string"
          }
        }
      },
      "unit_id": {
        "index": "not_analyzed",
        "store": true,
        "type": "string"
      }
    }

这是来自索引的最小数据集

单元

http://pastie.org/private/wippg2zfdrsiul5iaqiga

成员(member)部:

http://pastie.org/private/4xrnyeygnac5abcnwskg

最佳答案

我猜这是一个错误。

当我将这一部分取出来时:

        "total_active_ministers_by_unit": {
           "sum_bucket": {
              "buckets_path": "ministers>active_minister_by_ministry.value"
           }
        },

我得到了这些结果:
{
   "took": 5,
   "timed_out": false,
   "_shards": {
      "total": 1,
      "successful": 1,
      "failed": 0
   },
   "hits": {
      "total": 8,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "unit": {
         "doc_count_error_upper_bound": 0,
         "sum_other_doc_count": 0,
         "buckets": [
            {
               "key": "3a7d74d8bfc6b841",
               "doc_count": 3,
               "ministers": {
                  "doc_count": 2,
                  "ministry": {
                     "doc_count_error_upper_bound": 0,
                     "sum_other_doc_count": 1,
                     "buckets": [
                        {
                           "key": 2,
                           "doc_count": 1,
                           "active_minister": {
                              "doc_count": 1
                           },
                           "ministry_type": {
                              "doc_count_error_upper_bound": 0,
                              "sum_other_doc_count": 0,
                              "buckets": [
                                 {
                                    "key": "District Evangelist",
                                    "doc_count": 1
                                 }
                              ]
                           }
                        }
                     ]
                  },
                  "active_minister_by_ministry": {
                     "value": 1
                  }
               }
            },
            {
               "key": "3a7d74e834bbc0d6",
               "doc_count": 2,
               "ministers": {
                  "doc_count": 1,
                  "ministry": {
                     "doc_count_error_upper_bound": 0,
                     "sum_other_doc_count": 0,
                     "buckets": [
                        {
                           "key": 2,
                           "doc_count": 1,
                           "active_minister": {
                              "doc_count": 1
                           },
                           "ministry_type": {
                              "doc_count_error_upper_bound": 0,
                              "sum_other_doc_count": 0,
                              "buckets": [
                                 {
                                    "key": "Apostle",
                                    "doc_count": 1
                                 }
                              ]
                           }
                        }
                     ]
                  },
                  "active_minister_by_ministry": {
                     "value": 1
                  }
               }
            },
            {
               "key": "3a7b3abb09bfec65",
               "doc_count": 1,
               "ministers": {
                  "doc_count": 0,
                  "ministry": {
                     "doc_count_error_upper_bound": 0,
                     "sum_other_doc_count": 0,
                     "buckets": []
                  },
                  "active_minister_by_ministry": {
                     "value": 0
                  }
               }
            },
            {
               "key": "3a7d707e45c8dc24",
               "doc_count": 1,
               "ministers": {
                  "doc_count": 0,
                  "ministry": {
                     "doc_count_error_upper_bound": 0,
                     "sum_other_doc_count": 0,
                     "buckets": []
                  },
                  "active_minister_by_ministry": {
                     "value": 0
                  }
               }
            },
            {
               "key": "3a7d70975c84428c",
               "doc_count": 1,
               "ministers": {
                  "doc_count": 0,
                  "ministry": {
                     "doc_count_error_upper_bound": 0,
                     "sum_other_doc_count": 0,
                     "buckets": []
                  },
                  "active_minister_by_ministry": {
                     "value": 0
                  }
               }
            }
         ]
      }
   }
}

但是,尝试在父子关系之间进行管道传输会导致错误。

除非我缺少某些东西,否则应该将其报告为错误。

这是我用来测试的代码:

http://sense.qbox.io/gist/075732dfe51916538ff13ea76a2cec1f7532c691

关于elasticsearch - 通过子聚合进行Elasticsearch流水线,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34064760/

相关文章:

elasticsearch - 为什么要在C#中使用NEST客户端,而不通过 Elasticsearch JSON查询直接查询 Elasticsearch 服务器?

ruby-on-rails-3 - ElasticSearch Tire两场条件过滤器

elasticsearch - 如何在Elasticsearch中计算不同聚合中指标之间的差异

mysql - 经常性的数据需求——自动查询,还是直接在SQL中存储数据?

docker - 如何进行代码接收和Gitlab CI(持续集成)

elasticsearch - 嵌套updatebyquery我如何使用poco而不是编写脚本

elasticsearch - 排序 elasticsearch 热门结果

java - 具有空日期或值的 QueryBuilders rangeQuery

google-chrome-extension - 如何将成功的 TFS 构建自动上传到 Chrome 网上应用店?

r - 如何比较r中管道中的行数?