java - 在 Java 8 中与 ExecutorService 并行发送查询

标签 java http concurrency parallel-processing java-8

我正在用 Java 编写一个 Web 应用程序(即使用 JavaLite )。在这个 web 应用程序中,我有一个端点,它应该在调用时向服务器发送一堆其他请求。由于这些请求的数量可能会增加,所以我决定并行发送这些请求,使用Java 8中引入的Java Concurrency API。我并行发送多个请求的代码如下:

public List<String> searchAll(List<String> keywords) {
    ExecutorService executor = Executors.newWorkStealingPool();
    List<Callable<List<String>>> tasks = new ArrayList<>();
    for (String key : keywords) {
        tasks.add(() -> {
            LOGGER.info("Sending query for key: " + key);
            return sendSearchQuery(key);
        });
    }
    List<String> all = new ArrayList<>();
    try {
        executor.invokeAll(tasks)
                .stream()
                .map(future -> {
                    try {
                        return future.get();
                    }
                    catch (Exception e) {
                        throw new IllegalStateException(e);
                    }
                })
                .forEach((list) ->
                {
                    LOGGER.info("Received list: " + list);
                    all.addAll(list);
                });
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return all;
}

private List<String> sendSearchQuery(String query) throws UnirestException {
    long startTime = System.nanoTime();
    HttpResponse<JsonNode> response = Unirest.get(SEARCH_URL)
            .queryString("q", query).asString();
    Map<String, Object> result = JsonHelper.toMap(response.getBody());
//    Get get = Http.get(SEARCH_URL + "?q=" + query);
//    Map<String, Object> result = JsonHelper.toMap(get.text());
    LOGGER.info("Query received in " + (System.nanoTime() - startTime) / 1000000 + " ms for key: " + query);
    return (List<String>) result.get("result");
}

这段代码的输出如下:

[ForkJoinPool-2-worker-1] INFO app.managers.SearchManager - Sending query for key: sky
[ForkJoinPool-2-worker-2] INFO app.managers.SearchManager - Sending query for key: outdoor
[ForkJoinPool-2-worker-3] INFO app.managers.SearchManager - Sending query for key: bridge
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Sending query for key: water
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Query received in 1331 ms for key: water
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Sending query for key: building
[ForkJoinPool-2-worker-1] INFO app.managers.SearchManager - Query received in 1332 ms for key: sky
[ForkJoinPool-2-worker-2] INFO app.managers.SearchManager - Query received in 1332 ms for key: outdoor
[ForkJoinPool-2-worker-3] INFO app.managers.SearchManager - Query received in 1332 ms for key: bridge
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Query received in 302 ms for key: building
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [16973, 4564, 12392, 1195, 1207, 682, 10518, 10532, 10545, 19328, 10524, 10537, 10551, 19334, 10522, 10535, 10548, 19332, 10521, 10534]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: []
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [4303, 2844, 4366]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [9490, 1638, 20006, 17715, 17758, 18788, 6071, 11230, 13384, 4940, 18039, 17871, 16629, 6148, 19172, 4263, 4569, 8396, 18643, 4904]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [17306, 17303, 17305, 17304, 16062, 16156, 16153, 16154, 16061, 9098, 2491, 4368, 22134, 1008, 16152, 16151, 16148, 16155, 16147, 16149]

如您所见,我使用了两个不同的 Http 库( JavaLite HttpUnirest )来查看问题是否出在我使用的库上,但是情况似乎并非如此,因为它们都会产生相同的问题。

这里的问题是,第一个 n(机器上的处理器数量)查询同时开始和结束。这是正常现象,但它们也需要比应有的时间更长的时间。假设在正常情况下单个请求需要 t 时间。在这种情况下,第一个 n 查询每个都花费大约 n * t 时间,其余查询每个花费 t 时间。我是否错误地使用了并发 API?

编辑 SEARCH_URL 上运行的服务器部署在 Azure 上,它可以处理多个请求。

我还尝试了以下方法:

  • 使用 ExecutorService.newFixedThreadPool(),但是我使用的 Executor 似乎不是问题的原因。
  • 调用 invokeAny() 而不是 invokeAll(),但是前者会阻塞主线程直到其中一个任务完成,并且只返回该任务的结果.

编辑 2: 因此,我试用了服务器和我目前正在使用的应用程序。奇怪的是,服务器在不同时间响应 n 个请求,但是应用程序在从第一个请求到达服务器的时间开始到第 n 时间结束的时间范围后收到这些响应响应到达应用程序。我对这种行为没有任何解释。

最佳答案

您看过为 Java 8 引入的 completableFuture 框架吗?我可以帮助您尝试异步发送所有内容。

List<CompletableFuture<List<String>>> futures = keywords.parallelStream()
            .map(key -> CompletableFuture.supplyAsync(() -> sendSearchQuery(key), executor))
            .collect(toList());

CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

try {
    allOf.join();
} catch (Exception e){
    e.printStackTrace();
}

List<String> all = futures.stream().filter(CompletableFuture::isCompletedExceptionally)
            .flatMap(future -> future.join().stream())
            .collect(toList());

return all;

这将执行的操作是异步发送所有搜索结果,然后调用 allOf.join(),等待一切返回。

然后最终流将每个结果映射回一个列表并返回

关于java - 在 Java 8 中与 ExecutorService 并行发送查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42045251/

相关文章:

java - 从 1.6 迁移到 1.7(基于 maven 的项目)后,无法在 Jetbrains Intellij 中使用 Java 1.7 进行编译

java - 方法返回 Double 而不是 Integer

java - 如何将字节数组转换为 DSA 私钥?

android - 如何为 Http Get 请求设置 cookie?

python - 多处理 - 共享一个复杂的对象

java - 拒绝同线程可重入但允许异线程可重入的锁

java - 使用 Java unsafe 将 char 数组指向内存位置

http - 在电子邮件中使用协议(protocol)相对 URL 是否安全?

c# - 如何仅在向其端口发出 HTTP 请求时启动 Windows Web 服务?

c - 从状态 Controller 检索状态的最佳方法是什么?