我正在尝试使用 futures 进行并发 api 调用。代码:
private void init() throws ExecutionException, InterruptedException {
Long start = System.currentTimeMillis();
List<ApiResponse> responses = fetchAllUsingFuture(ids, 3);
log.info(responses.toString());
Long finish = System.currentTimeMillis();
log.info(MessageFormat.format("Process duration: {0} in ms", finish-start));
}
private List<ApiResponse> fetchAllUsingFuture(List<String> ids, int threadCount) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
List<List<String>> chunks = Utils.splitToChunks(ids, threadCount);
List<Future<List<ApiResponse>>> futures = new ArrayList<>();
chunks.forEach(chunk -> {
futures.add(wrapFetchInFuture(chunk));
});
Future<List<ApiResponse>> resultFuture = executorService.submit(() -> {
List<ApiResponse> responses = new ArrayList<>();
futures.forEach(future -> {
try {
responses.addAll(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
return responses;
});
executorService.shutdown();
return resultFuture.get();
}
private Future<List<ApiResponse>> wrapFetchInFuture(List<String> ids) {
return new FutureTask<>(() -> {
List<ApiResponse> responses = new ArrayList<>();
ids.forEach(id -> {
responses.add(fetchData(id));
});
return responses;
});
}
private ApiResponse fetchData(String id) {
ResponseEntity<ApiResponse> response = restTemplate.getForEntity(id, ApiResponse.class);
log.info(MessageFormat.format("Fetching from {0}", id));
ApiResponse body = response.getBody();
log.info(MessageFormat.format("Retrieved {0}", body));
return body;
}
它不会执行,应用程序启动然后就挂起。 future 没有兑现。感谢所有建议。 附:我知道使用 CompletableFuture 可以更轻松地完成此操作,我只是想知道如何使用 Futures 来做到这一点
最佳答案
在问题的原始版本中,您正在创建一个 FutureTasks
列表,但从未将它们发送到 ExecutorService
来运行它们。任务永远不会完成,因此 Future.get
永远阻塞。
在问题的更新版本中,您已将等待的代码作为任务放入执行程序服务中。 FutureTasks 永远不会运行,因此 FutureTask.get
仍将永远阻塞。
我建议您将fetchAllUsingFuture
中的代码更改为:
List<Callable<List<ApiResponse>>> tasks = new ArrayList<>();
chunks.forEach(chunk -> {
tasks.add(wrapFetchInCallable(chunk));
});
List<Future<List<ApiResponse>>> futures = executorService.invokeAll(tasks);
其中 wrapFetchInCallable
创建一个 Callable
而不是 FutureTask
:
private static Callable<List<ApiResponse>> wrapFetchInCallable(List<String> ids) {
return () -> {
List<ApiResponse> responses = new ArrayList<>();
ids.forEach(id -> {
responses.add(fetchData(id));
});
return responses;
};
}
关于java - 从 Java 中的 Futures 列表中收集结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63204221/