elasticsearch - 使用 bulkProcessor 时捕获 Elasticsearch 批量错误

标签 elasticsearch bulkinsert elasticsearch-2.0 bulkupdate

我使用 bulkProcessor 在 ElasticSearch 中插入/更新批量。
我想抓

  • EsRejectedExecutionException
  • 版本冲突引擎异常
  • DocumentAlreadyExistsException

  • 但它不会抛出任何东西。
    它只在响应项上设置一条消息。
    我该如何正确处理?例如如果被拒绝,则应用重试...
    public BulkResponse response bulkUpdate(.....) {
        BulkResponse bulkWriteResult = null;
        long startTime = System.currentTimeMillis();
        AtomicInteger amountOfRequests = new AtomicInteger();
        long esTime;
    
    
        ElasticBulkProcessorListener listener = new    ElasticBulkProcessorListener(updateOperations);
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener)
            .setBulkActions(MAX_BULK_ACTIONS)
            .setBulkSize(new ByteSizeValue(maxBulkSize, ByteSizeUnit.MB))
            .setConcurrentRequests(5)
            .build();
    
    
        updateOperations.forEach(updateRequest -> {
            bulkProcessor.add(updateRequest);
            amountOfRequests.getAndIncrement();
        });
    
    try {
        boolean isFinished = bulkProcessor.awaitClose(bulkTimeout, TimeUnit.SECONDS);
        if (isFinished) {
            if (listener.getBulkWriteResult() != null) {
                bulkWriteResult = listener.getBulkWriteResult();
            } else {
                throw new Exception("Bulk updating failed, results are empty");
            }
        } else {
            throw new Exception("Bulk updating failed, received timeout");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    
    return bulkWriteResult;
    }
    
    
    public class ElasticBulkProcessorListener implements BulkProcessor.Listener {
    private long esTime = 0;
    private List<Throwable> errors;
    private BulkResponse response;
    
    public long getEsTime() {
        return esTime;
    }
    
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        String description = "";
        if (!request.requests().isEmpty()) {
            ActionRequest request1 = request.requests().get(0);
            description = ((UpdateRequest) request1).type();
        }
    
        log.info("Bulk executionID: {}, estimated size is: {}MB, number of actions: {}, request type: {}",
                executionId, (request.estimatedSizeInBytes() / 1000000), request.numberOfActions(), description);
    }
    
    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        log.info("Bulk executionID: {}, took : {} Millis, bulk size: {}", executionId, response.getTookInMillis(), response.getItems().length);
        esTime = response.getTookInMillis();
        response = createBulkUpdateResult(response);
    }
    
    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        log.error("Bulk , failed! error: ", executionId, failure);
        throw new DataFWCoreException(String.format("Bulk executionID: %d, update operation failed", executionId), failure);
    }
    

    }

    最佳答案

    只有发生网络故障时才会调用故障处理程序,
    任何其他情况都将获得成功处理程序。

    正如我上面提到的,处理异常的唯一方法是解析每个响应项并找出发生了什么。

    关于elasticsearch - 使用 bulkProcessor 时捕获 Elasticsearch 批量错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42527971/

    相关文章:

    scala - 如何从ElasticSearch检索存储的Scala可变集合

    c# - TSQL:使用 INSERT INTO SELECT FROM 进行更新

    MySQL - 我们如何确定在表中插入大量行的批量插入语句的最佳数量?

    elasticsearch - Elasticsearch中的模糊搜索给出了顺序错误的匹配项

    elasticsearch - Elasticsearch:通过_all字段搜索

    Elasticsearch : Root mapping definition has unsupported parameters index : not_analyzed

    node.js - 在elasticsearch js中使用术语

    java - 如何通过excel导入实体及其依赖对象

    php - 正确查询对Elasticsearch PHP库的计数

    elasticsearch - Elasticsearch 索引中具有相同 _uid 的重复文档