elasticsearch - Elasticsearch批量API

标签 elasticsearch

我想问一下Elasticsearch Bulk API

这是我使用Bulk API的代码

public void bulkInsert(String index, ArrayList<String> jsonList) throws IOException {
    BulkRequest request = new BulkRequest(); 

    for(String json: jsonList){
        if(json != null&& !json.isEmpty()){
            request.add(new IndexRequest(index)  
                    .source(json, XContentType.JSON));  
        }
    }

    BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
    for (BulkItemResponse bulkItemResponse : bulkResponse) { 
        DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

        switch (bulkItemResponse.getOpType()) {
        case INDEX:    
        case CREATE:
            IndexResponse indexResponse = (IndexResponse) itemResponse;
            break;
        case UPDATE:   
            UpdateResponse updateResponse = (UpdateResponse) itemResponse;
            break;
        case DELETE:   
            DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
        }
    }
    if (bulkResponse.hasFailures()) { 
        for (BulkItemResponse bulkItemResponse : bulkResponse) {
            if (bulkItemResponse.isFailed()) { 
                BulkItemResponse.Failure failure =
                        bulkItemResponse.getFailure(); 

                System.out.println("failed: " + failure.getId());

            }
        }
    }
}

我遇到了超时异常,因为我的记录已达到80万。
java.net.SocketTimeoutException:连接http-outgoing-16上的30,000毫秒超时[ACTIVE]

我试图分解传入的jsonList,但有时会出现相同的错误。

我当前正在使用Elasticsearch 7.6.2版本。

异常跟踪

java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-16 [ACTIVE] at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:808) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:497) at com.ESUtil.bulkInsert(ESUtil.java:110) at org.download.App1.main(App1.java:167) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.eclipse.jdt.internal.jarinjarloader.JarRsrcLoader.main(JarRsrcLoader.java:58) Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-16 [ACTIVE] at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) at java.lang.Thread.run(Unknown Source)

最佳答案

当您使用批量API并将大量数据发送到Elasticsearch时,该连接的默认超时为30 seconds,而Elasticsearch无法在30 seconds中完成此庞大的批量操作,因此您将获得此异常。

对于大量的API,此超时是正常的,对于您的情况(特定于索引),您可以执行以下操作:

升级基础设施并加快索引速度

扩展集群,即添加更多的CPU,内存,更好的磁盘,禁用refresh_interval(默认为1秒)以加快批量索引的速度。

增加批量API超时时间

official ES doc中所述

request.timeout(TimeValue.timeValueMinutes(2));   --> 2 min timeout
request.timeout("2m");  --> string format of 2 sec.

编辑:如注释中所述,如果要立即sync execution of the bulk API,则可以使用check the response of your bulk API,以下是同一文档的引文:

Retrieve the response of the operation (successful or not), can be IndexResponse, UpdateResponse or DeleteResponse which can all be seen as DocWriteResponse instances

关于elasticsearch - Elasticsearch批量API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61477515/

相关文章:

elasticsearch - Elasticsearch单词自动完成,带有文档计数

java - 如何在java中的elasticsearch按距离排序中设置Ignore_unmapped true?

python - 错误 : The 'elasticsearch' backend requires the installation of 'requests' . 如何修复?

ruby-on-rails - 将 ElasticSearch 与 Tire(A Ruby on Rails Gem)一起使用时如何限制总返回结果?

elasticsearch - 使用query_string对特定ID进行Elasticsearch搜索

elasticsearch - 我可以使用不带主体的GET在Elasticsearch中按一个字段搜索并按另一个字段搜索吗?

elasticsearch - 如何获取字段的doc_values值?

elasticsearch - 由于 Guava 库版本不同,Spark on YARN与Elasticsearch TransportClient发生冲突

sorting - Elasticsearch-如何在排序后应用大小

elasticsearch - 如何使用正则表达式查询ElasticSearch?