我熟悉函数式编程语言,通常是 Scala 和 Javascript。我正在处理一个 Java8 项目,但不确定我应该如何运行一个项目列表/流,并使用自定义线程池对它们中的每一个并行执行一些副作用,并返回一个对象可以监听是否完成(无论是成功还是失败)。
目前我有以下代码,它似乎可以工作(我正在使用 Play 框架 Promise 实现作为返回)但它似乎并不理想,因为 ForkJoinPool 一开始并不打算用于 IO 密集型计算。
public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
ForkJoinPool pool = new ForkJoinPool(3);
ForkJoinTask<F.Promise<Void>> result = pool
.submit(() -> {
try {
items.parallel().forEach(performSingleItemBackup);
return F.Promise.<Void>pure(null);
} catch (Exception e) {
return F.Promise.<Void>throwing(e);
}
});
try {
return result.get();
} catch (Exception e) {
throw new RuntimeException("Unable to get result", e);
}
}
有人可以给我一个上述功能的更惯用的实现吗?理想情况下不使用 ForkJoinPool,使用更标准的返回类型和最新的 Java8 API?不确定我应该在 CompletableFuture、CompletionStage、ForkJoinTask 之间使用什么...
最佳答案
规范的解决方案是
public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
ForkJoinPool pool = new ForkJoinPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new));
} finally {
pool.shutdown();
}
}
请注意,ForkJoin 池和并行流之间的交互是您不应依赖的未指定实现细节。相比之下,CompletableFuture
提供了一个专用 API 来提供 Executor
.它甚至不必是 ForkJoinPool
:
public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new));
} finally {
pool.shutdown();
}
}
在任何一种情况下,您都应该明确关闭执行程序,而不是依赖自动清理。
如果你需要 F.Promise<Void>
结果,你可以使用
public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new))
.handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
.join();
} finally {
pool.shutdown();
}
}
但请注意,这与您的原始代码一样,仅在操作完成时返回,而返回 CompletableFuture
的方法允许操作异步运行,直到调用者调用 join
或 get
.
返回一个真正异步的Promise
,你必须包装整个操作,例如
public static F.Promise<Void> performAllItemsBackup(Stream<Item> stream) {
return F.Promise.pure(stream).flatMap(items -> {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new))
.handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
.join();
} finally {
pool.shutdown();
}
});
}
但最好还是选择一种 API,而不是在两种不同的 API 之间来回跳转。
关于java - 在 Java8 中并行运行 IO 计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48076098/