我正在学习并尝试将 CompletableFuture 应用于我的问题陈述。我有一个正在迭代的项目列表。
Prop 是一个只有两个属性 prop1 和 prop2 的类,分别是 getter 和 setter。
List<Prop> result = new ArrayList<>();
for ( Item item : items ) {
item.load();
Prop temp = new Prop();
// once the item is loaded, get its properties
temp.setProp1(item.getProp1());
temp.setProp2(item.getProp2());
result.add(temp);
}
return result;
但是,这里的 item.load() 是一个阻塞调用。所以,我想使用 CompletableFuture ,如下所示 -
for (Item item : items) {
CompletableFuture<Prop> prop = CompletableFuture.supplyAsync(() -> {
try {
item.load();
return item;
} catch (Exception e) {
logger.error("Error");
return null;
}
}).thenApply(item1 -> {
try {
Prop temp = new Prop();
// once the item is loaded, get its properties
temp.setProp1(item.getProp1());
temp.setProp2(item.getProp2());
return temp;
} catch (Exception e) {
}
});
}
但我不确定如何等待所有项目加载,然后聚合并返回结果。
我在实现 CompletableFutures 的方式上可能是完全错误的,因为这是我的第一次尝试。如有错误请见谅。预先感谢您的帮助。
最佳答案
您使用 CompletableFuture
的方法存在两个问题。
首先,你说 item.load()
是一个阻塞调用,因此 CompletableFuture
的默认执行器不适合它,因为它试图实现并行级别与 CPU 核心数量相匹配。您可以通过将不同的 Executor
传递给 CompletableFuture
的异步方法来解决此问题,但您的 load()
方法不会返回以下值你的后续操作依赖。因此,使用 CompletableFuture
会使设计变得复杂,但没有任何好处。
您可以异步执行 load()
调用,并仅使用 ExecutorService
等待其完成,然后按原样循环(无需已执行的 >load()
操作,当然):
ExecutorService es = Executors.newCachedThreadPool();
es.invokeAll(items.stream()
.map(i -> Executors.callable(i::load))
.collect(Collectors.toList()));
es.shutdown();
List<Prop> result = new ArrayList<>();
for(Item item : items) {
Prop temp = new Prop();
// once the item is loaded, get its properties
temp.setProp1(item.getProp1());
temp.setProp2(item.getProp2());
result.add(temp);
}
return result;
您可以通过选择执行器来控制并行度,例如您可以使用 Executors.newFixedThreadPool(numberOfThreads) 代替无界线程池。
关于java - 如何聚合循环中进行 CompletableFuture 调用的结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44895343/