java - 当前队列负载如何调用Elastic Search?

标签 java elasticsearch queue

在广泛查询 ES 时,我得到

Failed to execute [org.elasticsearch.action.search.SearchRequest@59e634e2] lastShard [true]
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on org.elasticsearch.search.
action.SearchServiceTransportAction$23@75bd024b
        at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
        at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:79)
        at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:551)
        at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteQuery(SearchServiceTransportAction.java:228)
        at org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction$AsyncAction.sendExecuteFirstPhase(TransportSearchQueryThenFetchAction.java:83)

非常定期。

我现在的计划是暂停查询请求,直到队列负载低于x。您可以查询客户端的 stats

client.admin().cluster().threadPool().stats().iterator();

但由于我的客户端不是数据节点(我想这就是原因),我得到 queue=0 返回,而服务器节点抛出上述错误。

我知道为什么会抛出这个错误,我知道如何更新设置,但这只会推迟这个错误,并产生其他错误...

我如何询问集群节点他们的队列负载是多少?

PS:我使用的是 Java Api

我已经尝试过,没有要求的结果,空行表示另一次尝试,除非另有说明

//Nodes stats
final NodesStatsResponse nodesStatsResponse = client.admin().cluster().prepareNodesStats().execute().actionGet();
final NodeStats nodeStats = nodesStatsResponse.getNodes()[0];
final String nodeId = nodeStats.getNode().getId(); // need this later on

// same as before, but with explicit NodesStatsRequest (with id)
final NodesStatsResponse response = client.admin().cluster().nodesStats(new NodesStatsRequest(nodeId)).actionGet();
final NodeStats[] nodeStats2 = response.getNodes();
for (NodeStats nodeStats3 : nodeStats2) {
    Stats stats = nodeStats3.getThreadPool().iterator().next();
}

// Cluster?
final ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequestBuilder(client.admin().cluster()).request();
final ClusterStatsResponse clusterStatsResponse = client.admin().cluster().clusterStats(clusterStatsRequest).actionGet();
final ClusterStatsNodes clusterStatsNodes = clusterStatsResponse.getNodesStats();

// Nodes info?
final NodesInfoResponse infoResponse = client.admin().cluster().nodesInfo(new NodesInfoRequest(nodeId)).actionGet();// here
final NodeInfo[] nodeInfos = infoResponse.getNodes();
for (final NodeInfo nodeInfo : nodeInfos) {
    final ThreadPoolInfo info = nodeInfo.getThreadPool();
    final Iterator<Info> infoIterator = info.iterator();
    while (infoIterator.hasNext()) {
        final Info realInfo = infoIterator.next();
        SizeValue sizeValue = realInfo.getQueueSize();
        // is no == null, then (¿happens?, was expecting a nullpointer, but Thread disappeared)
        if (sizeValue == null) 
            continue;
        // normal queue size, no load (oddly found 1000 (expected), and one of 200 in one node?)
        final long queueSize = sizeValue.getSingles(); 
    }
}

问题是某些进程需要立即调用(例如用户请求),而其他进程可能会在数据库太忙时等待(后台进程)。我最好将一定数量的队列分配给处理即时请求的进程,将另一部分分配给后台进程(但我没有看到此选项)。

更新 看起来,我没想到的是,当单独搜索的总数超过 1000 时(当 x 分片或 x 索引除以 1000/x 时,单个批量查询会出现查询过载)搜索)。所以批量化不是一个选项,除非您可以进行单个 查询。因此,当您一次定位 700 个搜索结果时(考虑到上述声明),您需要知道队列中是否有超过 300 个项目,因为它会抛出东西。

总结:

假设每次调用的负载是最大 bulkrequest 因此我无法合并请求。那么,我如何才能在 elasticsearch 开始抛出上述异常之前开始暂停请求。所以我可以暂停应用程序的一部分,但不能暂停另一部分?如果我知道队列已满,比如说,在中途,后台进程必须 hibernate 一段时间。我如何知道(近似)队列负载?

最佳答案

您试图查看队列使用情况的方式是错误的,因为您没有查看正确的统计数据。

看看这段代码:

    final NodesStatsResponse response = client.admin().cluster().prepareNodesStats().setThreadPool(true).execute().actionGet();
    final NodeStats[] nodeStats2 = response.getNodes();

    for (NodeStats nodeStats3 : nodeStats2) {
        ThreadPoolStats stats = nodeStats3.getThreadPool();

        if (stats != null)
            for (ThreadPoolStats.Stats threadPoolStat : stats) {
                System.out.println("node `" + nodeStats3.getNode().getName() + "`" + " has pool `" + threadPoolStat.getName() + "` with current queue size " + threadPoolStat.getQueue());
            }
    }

首先,您需要setThreadPool(true) 才能取回线程池统计信息,否则它将为null

其次,您需要 ThreadPoolStats 而不是用于线程池设置的 ThreadPoolInfo

所以,这是您的第二次尝试,但未完成。您看到的 1000 是设置本身(最大队列大小),而不是实际负载。

关于java - 当前队列负载如何调用Elastic Search?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31670022/

相关文章:

ios - 将进度条添加到 UIAlertController 并显示更新

ElasticSearch:preserve_position_increments 不工作

elasticsearch - Elasticsearch:将cutoff_frequency与and运算符结合?

elasticsearch - Elasticsearch :字符串数组上的完全匹配查询

java - Android平台的R.id是如何生成的?

c# - 在 C# 中线程化时锁定变量

java - 队列<整数> q = new LinkedList<整数>()

java - 从套接字获取十六进制值并转换为十进制

java - 错误: Failed to create capture session

java - 将字符串值从 Activity 类传递到非 Activity 类