elasticsearch - 浏览所有文档并批量更新其中一些

标签 elasticsearch jest

我正在使用Jest client for Elastic浏览文档索引以更新一个字段。我的工作流程是运行带有分页的空查询,并查看是否可以计算额外的字段。如果可以,我将一次批量更新相关文档。

伪码

private void process() {
    int from = 0
    int size = this.properties.batchSize
    boolean moreResults = true
    while (moreResults) {
        moreResults = handleBatch(from, this.properties.batchSize)
        from += size
    }
}

private boolean handleBatch(int from, int size) {
    log.info("Processing records $from to " + (from + size))
    def result = search(from, size)
    if (result.isSucceeded()) {
        // Check each element and perform an upgrade
    }
    // return true if the query returned at least one item
}

private SearchResult search(int from, int size) {
    String query =
            '{ "from": ' + from + ', ' +
                    '"size": ' + size + '}'


    Search search = new Search.Builder(query)
            .addIndex("my-index")
            .addType('my-document')
            .build();
    jestClient.execute(search)
}

我没有任何错误,但是当我多次运行该批处理时,似乎正在寻找要升级的"new"文档,而文档总数没有改变。我怀疑更新的文档已被处理了几次,可以通过检查已处理的ID来确认。

如何运行查询,以使原始文档得到处理,并且任何更新都不会干扰它?

最佳答案

除了运行常规搜索(即使用from + size)以外,您需要运行 scroll search query。主要区别在于,滚动将冻结给定的文档快照(在查询时)并进行查询。在第一个滚动查询之后发生的任何更改都不会考虑。

使用Jest,您需要修改代码,使其看起来像这样:

    // 1. Initiate the scroll request
    Search search = new Search.Builder(searchSourceBuilder.toString())
            .addIndex("my-index")
            .addType("my-document")
            .addSort(new Sort("_doc"))
            .setParameter(Parameters.SIZE, size)
            .setParameter(Parameters.SCROLL, "5m")
            .build();
    JestResult result = jestClient.execute(search);

    // 2. Get the scroll_id to use in subsequent request
    String scrollId = result.getJsonObject().get("_scroll_id").getAsString();

    // 3. Issue scroll search requests until you have retrieved all results
    boolean moreResults = true;
    while (moreResults) {
        SearchScroll scroll = new SearchScroll.Builder(scrollId, "5m")
                .setParameter(Parameters.SIZE, size).build();
        result = client.execute(scroll);
        def hits = result.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits");
        moreResults = hits.size() > 0;
    }

您需要使用以上代码修改processhandleBatch方法。它应该简单明了,如果没有的话让我知道。

关于elasticsearch - 浏览所有文档并批量更新其中一些,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35381294/

相关文章:

python - 使用数组作为搜索输入的Elasticsearch查询

scala - 使用elastic4s在搜索中获得零结果

elasticsearch - 无法访问端口5601上的Docker容器中运行的Kibana

elasticsearch - 使用updateBuilder时不会发生Elasticsearch部分更新

Elasticsearch 部分更新与完整更新?

elasticsearch - Elasticsearch地理距离查询范围

elasticsearch - 使用JEST通过查询删除ElasticSearch

elasticsearch - 如何使用Jest在 Elasticsearch 中获取索引的创建时间

spring - 如果我减少@Document批注中的shards参数,Elasticsearch会自行减少shards数量吗?

elasticsearch - Elasticsearch:请求[/some index/_refresh]包含无法识别的参数:[refresh]