我正在用 Java 编写一个 Web 应用程序(即使用 JavaLite )。在这个 web 应用程序中,我有一个端点,它应该在调用时向服务器发送一堆其他请求。由于这些请求的数量可能会增加,所以我决定并行发送这些请求,使用Java 8中引入的Java Concurrency API。我并行发送多个请求的代码如下:
public List<String> searchAll(List<String> keywords) {
ExecutorService executor = Executors.newWorkStealingPool();
List<Callable<List<String>>> tasks = new ArrayList<>();
for (String key : keywords) {
tasks.add(() -> {
LOGGER.info("Sending query for key: " + key);
return sendSearchQuery(key);
});
}
List<String> all = new ArrayList<>();
try {
executor.invokeAll(tasks)
.stream()
.map(future -> {
try {
return future.get();
}
catch (Exception e) {
throw new IllegalStateException(e);
}
})
.forEach((list) ->
{
LOGGER.info("Received list: " + list);
all.addAll(list);
});
} catch (InterruptedException e) {
e.printStackTrace();
}
return all;
}
private List<String> sendSearchQuery(String query) throws UnirestException {
long startTime = System.nanoTime();
HttpResponse<JsonNode> response = Unirest.get(SEARCH_URL)
.queryString("q", query).asString();
Map<String, Object> result = JsonHelper.toMap(response.getBody());
// Get get = Http.get(SEARCH_URL + "?q=" + query);
// Map<String, Object> result = JsonHelper.toMap(get.text());
LOGGER.info("Query received in " + (System.nanoTime() - startTime) / 1000000 + " ms for key: " + query);
return (List<String>) result.get("result");
}
这段代码的输出如下:
[ForkJoinPool-2-worker-1] INFO app.managers.SearchManager - Sending query for key: sky
[ForkJoinPool-2-worker-2] INFO app.managers.SearchManager - Sending query for key: outdoor
[ForkJoinPool-2-worker-3] INFO app.managers.SearchManager - Sending query for key: bridge
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Sending query for key: water
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Query received in 1331 ms for key: water
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Sending query for key: building
[ForkJoinPool-2-worker-1] INFO app.managers.SearchManager - Query received in 1332 ms for key: sky
[ForkJoinPool-2-worker-2] INFO app.managers.SearchManager - Query received in 1332 ms for key: outdoor
[ForkJoinPool-2-worker-3] INFO app.managers.SearchManager - Query received in 1332 ms for key: bridge
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Query received in 302 ms for key: building
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [16973, 4564, 12392, 1195, 1207, 682, 10518, 10532, 10545, 19328, 10524, 10537, 10551, 19334, 10522, 10535, 10548, 19332, 10521, 10534]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: []
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [4303, 2844, 4366]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [9490, 1638, 20006, 17715, 17758, 18788, 6071, 11230, 13384, 4940, 18039, 17871, 16629, 6148, 19172, 4263, 4569, 8396, 18643, 4904]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [17306, 17303, 17305, 17304, 16062, 16156, 16153, 16154, 16061, 9098, 2491, 4368, 22134, 1008, 16152, 16151, 16148, 16155, 16147, 16149]
如您所见,我使用了两个不同的 Http 库( JavaLite Http 和 Unirest )来查看问题是否出在我使用的库上,但是情况似乎并非如此,因为它们都会产生相同的问题。
这里的问题是,第一个 n
(机器上的处理器数量)查询同时开始和结束。这是正常现象,但它们也需要比应有的时间更长的时间。假设在正常情况下单个请求需要 t
时间。在这种情况下,第一个 n
查询每个都花费大约 n * t
时间,其余查询每个花费 t
时间。我是否错误地使用了并发 API?
编辑 SEARCH_URL
上运行的服务器部署在 Azure 上,它可以处理多个请求。
我还尝试了以下方法:
- 使用
ExecutorService.newFixedThreadPool()
,但是我使用的Executor
似乎不是问题的原因。 - 调用
invokeAny()
而不是invokeAll()
,但是前者会阻塞主线程直到其中一个任务完成,并且只返回该任务的结果.
编辑 2: 因此,我试用了服务器和我目前正在使用的应用程序。奇怪的是,服务器在不同时间响应 n 个请求,但是应用程序在从第一个请求到达服务器的时间开始到第 n 时间结束的时间范围后收到这些响应响应到达应用程序。我对这种行为没有任何解释。
最佳答案
您看过为 Java 8 引入的 completableFuture 框架吗?我可以帮助您尝试异步发送所有内容。
List<CompletableFuture<List<String>>> futures = keywords.parallelStream()
.map(key -> CompletableFuture.supplyAsync(() -> sendSearchQuery(key), executor))
.collect(toList());
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try {
allOf.join();
} catch (Exception e){
e.printStackTrace();
}
List<String> all = futures.stream().filter(CompletableFuture::isCompletedExceptionally)
.flatMap(future -> future.join().stream())
.collect(toList());
return all;
这将执行的操作是异步发送所有搜索结果,然后调用 allOf.join(),等待一切返回。
然后最终流将每个结果映射回一个列表并返回
关于java - 在 Java 8 中与 ExecutorService 并行发送查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42045251/