java - 在 Java8 中并行运行 IO 计算

标签 java java-8

我熟悉函数式编程语言,通常是 Scala 和 Javascript。我正在处理一个 Java8 项目,但不确定我应该如何运行一个项目列表/流,并使用自定义线程池对它们中的每一个并行执行一些副作用,并返回一个对象可以监听是否完成(无论是成功还是失败)。

目前我有以下代码,它似乎可以工作(我正在使用 Play 框架 Promise 实现作为返回)但它似乎并不理想,因为 ForkJoinPool 一开始并不打算用于 IO 密集型计算。

public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
    ForkJoinPool pool = new ForkJoinPool(3);
    ForkJoinTask<F.Promise<Void>> result = pool
            .submit(() -> {
                try {
                    items.parallel().forEach(performSingleItemBackup);
                    return F.Promise.<Void>pure(null);
                } catch (Exception e) {
                    return F.Promise.<Void>throwing(e);
                }
            });

    try {
        return result.get();
    } catch (Exception e) {
        throw new RuntimeException("Unable to get result", e);
    }
}

有人可以给我一个上述功能的更惯用的实现吗?理想情况下不使用 ForkJoinPool,使用更标准的返回类型和最新的 Java8 API?不确定我应该在 CompletableFuture、CompletionStage、ForkJoinTask 之间使用什么...

最佳答案

规范的解决方案是

public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
    ForkJoinPool pool = new ForkJoinPool(3);
    try {
        return CompletableFuture.allOf(
            items.map(CompletableFuture::completedFuture)
                 .map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
                 .toArray(CompletableFuture<?>[]::new));
    } finally {
        pool.shutdown();
    }
}

请注意,ForkJoin 池和并行流之间的交互是您不应依赖的未指定实现细节。相比之下,CompletableFuture提供了一个专用 API 来提供 Executor .它甚至不必是 ForkJoinPool :

public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    try {
        return CompletableFuture.allOf(
            items.map(CompletableFuture::completedFuture)
                 .map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
                 .toArray(CompletableFuture<?>[]::new));
    } finally {
        pool.shutdown();
    }
}

在任何一种情况下,您都应该明确关闭执行程序,而不是依赖自动清理。

如果你需要 F.Promise<Void>结果,你可以使用

public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    try {
        return CompletableFuture.allOf(
            items.map(CompletableFuture::completedFuture)
                 .map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
                 .toArray(CompletableFuture<?>[]::new))
            .handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
            .join();
    } finally {
        pool.shutdown();
    }
}

但请注意,这与您的原始代码一样,仅在操作完成时返回,而返回 CompletableFuture 的方法允许操作异步运行,直到调用者调用 joinget .

返回一个真正异步的Promise ,你必须包装整个操作,例如

public static F.Promise<Void> performAllItemsBackup(Stream<Item> stream) {
    return F.Promise.pure(stream).flatMap(items -> {
        ExecutorService pool = Executors.newFixedThreadPool(3);
        try {
            return CompletableFuture.allOf(
                items.map(CompletableFuture::completedFuture)
                     .map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
                     .toArray(CompletableFuture<?>[]::new))
                .handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
                .join();
        } finally {
            pool.shutdown();
        }
    });
}

但最好还是选择一种 API,而不是在两种不同的 API 之间来回跳转。

关于java - 在 Java8 中并行运行 IO 计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48076098/

相关文章:

java - keystore 文件不存在

java - 为回文创建递归方法

java - 基于 java 8 流中先前元素的过滤

java - 重构方法引用以支持参数

Java lambda 循环

Java 8 Streams : multiple filters vs. 复杂情况

java - 接受任意次数的输入

java - 使用 Retrofit android 进行 Curl

java - Spring安全并发 session : failed to make "max-sessions" field configurable

oop - 信息隐藏和函数式编程编码风格