我想问一下Elasticsearch Bulk API
这是我使用Bulk API的代码
public void bulkInsert(String index, ArrayList<String> jsonList) throws IOException {
BulkRequest request = new BulkRequest();
for(String json: jsonList){
if(json != null&& !json.isEmpty()){
request.add(new IndexRequest(index)
.source(json, XContentType.JSON));
}
}
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
if (bulkResponse.hasFailures()) {
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure =
bulkItemResponse.getFailure();
System.out.println("failed: " + failure.getId());
}
}
}
}
我遇到了超时异常,因为我的记录已达到80万。
java.net.SocketTimeoutException:连接http-outgoing-16上的30,000毫秒超时[ACTIVE]
我试图分解传入的jsonList,但有时会出现相同的错误。
我当前正在使用Elasticsearch 7.6.2版本。
异常跟踪
java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-16 [ACTIVE] at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:808) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:497) at com.ESUtil.bulkInsert(ESUtil.java:110) at org.download.App1.main(App1.java:167) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.eclipse.jdt.internal.jarinjarloader.JarRsrcLoader.main(JarRsrcLoader.java:58) Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-16 [ACTIVE] at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) at java.lang.Thread.run(Unknown Source)
最佳答案
当您使用批量API并将大量数据发送到Elasticsearch时,该连接的默认超时为30 seconds
,而Elasticsearch无法在30 seconds
中完成此庞大的批量操作,因此您将获得此异常。
对于大量的API,此超时是正常的,对于您的情况(特定于索引),您可以执行以下操作:
升级基础设施并加快索引速度
扩展集群,即添加更多的CPU,内存,更好的磁盘,禁用refresh_interval(默认为1秒)以加快批量索引的速度。
增加批量API超时时间
如official ES doc中所述
request.timeout(TimeValue.timeValueMinutes(2)); --> 2 min timeout
request.timeout("2m"); --> string format of 2 sec.
编辑:如注释中所述,如果要立即sync execution of the bulk API,则可以使用check the response of your bulk API,以下是同一文档的引文:
Retrieve the response of the operation (successful or not), can be IndexResponse, UpdateResponse or DeleteResponse which can all be seen as DocWriteResponse instances
关于elasticsearch - Elasticsearch批量API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61477515/