java - elasticsearch 中的 too_many_buckets_exception

标签 java elasticsearch elasticsearch-rest-client

我在 ElasticSearch 聚合中遇到问题。我们使用 RestHighLevelClient 在 Java 中查询 ElasticSearch。

异常(exception)是-

ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=]]; nested: ElasticsearchException[Elasticsearch exception [type=too_many_buckets_exception, reason=Trying to create too many buckets. Must be less than or equal to: [20000] but was [20001]. This limit can be set by changing the [search.max_buckets] cluster level setting.]];

我已经使用 PUT 请求更改了 search.max_buckets,但我仍然面临这个问题。

PUT /_cluster/settings { "persistent" : { "search.max_buckets":20000 } }

首先,根据我们的要求,我们必须按日聚合数据,然后按小时聚合数据,然后再以 ruleId 为基础。聚合看起来像低于水平 -

Day{
    1:00[
       {
       ruleId : 1 ,
       count : 20
       },
       {
       ruleId : 2 ,
       count : 25
       }
    ],
    2:00[
    {
       ruleId : 1 ,
       count : 20
       },
       {
       ruleId : 2 ,
       count : 25
       }
    ]

现在我的代码是 -

    final List<DTO> violationCaseMgmtDtos = new ArrayList<>();
        try {
            RangeQueryBuilder queryBuilders =
                (end_timestmp > 0 ? customTimeRangeQueryBuilder(start_timestmp, end_timestmp, generationTime)
                    : daysTimeRangeQueryBuilder(14, generationTime));

            BoolQueryBuilder boolQuery = new BoolQueryBuilder();
            boolQuery.must(queryBuilders);
            boolQuery.must(QueryBuilders.matchQuery("pvGroupBy", true));
            boolQuery.must(QueryBuilders.matchQuery("pvInformation", false));
            TopHitsAggregationBuilder topHitsAggregationBuilder =
                AggregationBuilders.topHits("topHits").docValueField(policyId).sort(generationTime, SortOrder.DESC);

            TermsAggregationBuilder termsAggregation = AggregationBuilders.terms("distinct").field(policyId).size(10000)
                .subAggregation(topHitsAggregationBuilder);

            DateHistogramAggregationBuilder timeHistogramAggregationBuilder =
                AggregationBuilders.dateHistogram("by_hour").field("eventDateTime")
                    .fixedInterval(DateHistogramInterval.HOUR).subAggregation(termsAggregation);

            DateHistogramAggregationBuilder dateHistogramAggregationBuilder =
                AggregationBuilders.dateHistogram("by_day").field("eventDateTime")
                    .fixedInterval(DateHistogramInterval.DAY).subAggregation(timeHistogramAggregationBuilder);

            SearchRequest searchRequest = new SearchRequest(violationDataModel);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.aggregation(dateHistogramAggregationBuilder);
            searchSourceBuilder.query(boolQuery);
            searchSourceBuilder.from(offset);
            searchSourceBuilder.size(10000);
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = null;

            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

            ParsedDateHistogram parsedDateHistogram = searchResponse.getAggregations().get("by_day");

            parsedDateHistogram.getBuckets().parallelStream().forEach(dayBucket -> {


                ParsedDateHistogram hourBasedData = dayBucket.getAggregations().get("by_hour");

                hourBasedData.getBuckets().parallelStream().forEach(hourBucket -> {

                    // TimeLine timeLine = new TimeLine();
                    String dateTime = hourBucket.getKeyAsString();
                    // long dateInLong = DateUtil.getMiliSecondFromStringDate(dateTime);
                    // timeLine.setViolationEventTime(dateTime);

                    ParsedLongTerms distinctPolicys = hourBucket.getAggregations().get("distinct");
                    distinctPolicys.getBuckets().parallelStream().forEach(policyBucket -> {

                        DTO violationCaseManagementDTO = new DTO();
                        violationCaseManagementDTO.setDataAggregated(true);
                        violationCaseManagementDTO.setEventDateTime(dateTime);
                        violationCaseManagementDTO.setRuleId(Long.valueOf(policyBucket.getKey().toString()));

                        ParsedTopHits parsedTopHits = policyBucket.getAggregations().get("topHits");
                        SearchHit[] searchHits = parsedTopHits.getHits().getHits();
                        SearchHit searchHit = searchHits[0];

                        String source = searchHit.getSourceAsString();
                        ViolationDataModel violationModel = null;
                        try {
                            violationModel = objectMapper.readValue(source, ViolationDataModel.class);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

                        violationCaseManagementDTO.setRuleName(violationModel.getRuleName());
                        violationCaseManagementDTO.setGenerationTime(violationModel.getGenerationTime());
                        violationCaseManagementDTO.setPriority(violationModel.getPriority());
                        violationCaseManagementDTO.setStatus(violationModel.getViolationStatus());
                        violationCaseManagementDTO.setViolationId(violationModel.getId());
                        violationCaseManagementDTO.setEntity(violationModel.getViolator());
                        violationCaseManagementDTO.setViolationType(violationModel.getViolationEntityType());
                        violationCaseManagementDTO.setIndicatorsOfAttack( (int)
                            (policyBucket.getDocCount() * violationModel.getNoOfViolatedEvents()));
                        violationCaseMgmtDtos.add(violationCaseManagementDTO);

                    });
                  //  violationCaseMgmtDtos.sort((d1,d2) -> d1.getEventDateTime().compareTo(d2.getEventDateTime()));
                });

            });

            List<DTO> realtimeViolation = findViolationWithoutGrouping(start_timestmp,  end_timestmp,  offset,  size);
            realtimeViolation.stream().forEach(action -> violationCaseMgmtDtos.add(action)); 
        } catch (Exception e) {
            e.printStackTrace();
        }

        if (Objects.nonNull(violationCaseMgmtDtos) && violationCaseMgmtDtos.size() > 0) {
            return violationCaseMgmtDtos.stream()
                .filter(violationDto -> Objects.nonNull(violationDto))
                .sorted((d1,d2) -> d2.getEventDateTime().compareTo(d1.getEventDateTime()))
                .collect(Collectors.toList());
        }
        return violationCaseMgmtDtos;
}

请帮我解决这个问题。

最佳答案

如果您使用的是 ES 版本 7.x.x,那么您可以将 terminate_after 子句添加到您的查询中,以限制数据将被划分到的桶的数量。这主要发生在您尝试聚合的数据具有高度随机性时。

如果您的数据包含文本,那么最好在 .keyword 字段上聚合(假设您使用的是默认设置)。

POST your_index/_search
{
  "from": 0,
  "query": {
    "match_all": {}
  },
  "size": 0,
  "sort": [
    {
      "your_target_field": {
        "order": "desc"
      }
    }
  ],
  "terminate_after": 10000,
  "version": true,
  "aggs": {
    "title": {
      "terms": {
        "field": "your_target_field.keyword",
        "size": 10000
      }
    }
  }
}

关于java - elasticsearch 中的 too_many_buckets_exception,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58234784/

相关文章:

java - 如何在我的 ListView 适配器中缓存自定义 View ?

java - 通过高级客户端写入elasticsearch - 在文档上设置时间戳

spring-boot - 如何在Spring Boot中编写以下 Elasticsearch 通配符查询?

python - Django DRF elastic search dsl,基于另一个字段数值应用功能增强

elasticsearch - 验证失败:1:缺少源; 2:缺少内容类型;在ElasticSearch中?

java - 不使用高斯开始边缘检测方法会获得什么信息?

java.util.MissingResourceException : Can't find bundle for base name ResBundle, 语言环境 en_GB

java - 语音语音合成器中的女性声音输出

elasticsearch - 安装Kopf-Elasticsearch