java - 为什么 CompletableFuture 在单独的流中加入/获取比使用一个流更快

标签 java java-8 java-stream completable-future

对于以下程序,我试图弄清楚为什么使用 2 个不同的流并行执行任务,并使用相同的流并在 Completable future 上调用 join/get 会使它们花费更长的时间,相当于按顺序处理它们一样)。

public class HelloConcurrency {

    private static Integer sleepTask(int number) {
        System.out.println(String.format("Task with sleep time %d", number));
        try {
            TimeUnit.SECONDS.sleep(number);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return -1;
        }
        return number;
    }

    public static void main(String[] args) {
        List<Integer> sleepTimes = Arrays.asList(1,2,3,4,5,6);
        System.out.println("WITH SEPARATE STREAMS FOR FUTURE AND JOIN");
        ExecutorService executorService = Executors.newFixedThreadPool(6);
        long start = System.currentTimeMillis();
        List<CompletableFuture<Integer>> futures = sleepTimes.stream()
                .map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService)
                        .exceptionally(ex -> { ex.printStackTrace(); return -1; }))
                .collect(Collectors.toList());
        executorService.shutdown();
        List<Integer> result = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
        long finish = System.currentTimeMillis();
        long timeElapsed = (finish - start)/1000;
        System.out.println(String.format("done in %d seconds.", timeElapsed));
        System.out.println(result);

        System.out.println("WITH SAME STREAM FOR FUTURE AND JOIN");
        ExecutorService executorService2 = Executors.newFixedThreadPool(6);
        start = System.currentTimeMillis();
        List<Integer> results = sleepTimes.stream()
                .map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService2)
                        .exceptionally(ex -> { ex.printStackTrace(); return -1; }))
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
        executorService2.shutdown();
        finish = System.currentTimeMillis();
        timeElapsed = (finish - start)/1000;
        System.out.println(String.format("done in %d seconds.", timeElapsed));
        System.out.println(results);
    }
}

输出

WITH SEPARATE STREAMS FOR FUTURE AND JOIN
Task with sleep time 6
Task with sleep time 5
Task with sleep time 1
Task with sleep time 3
Task with sleep time 2
Task with sleep time 4
done in 6 seconds.
[1, 2, 3, 4, 5, 6]
WITH SAME STREAM FOR FUTURE AND JOIN
Task with sleep time 1
Task with sleep time 2
Task with sleep time 3
Task with sleep time 4
Task with sleep time 5
Task with sleep time 6
done in 21 seconds.
[1, 2, 3, 4, 5, 6]

最佳答案

这两种方法有很大不同,让我尝试解释清楚

第一种方法:在第一种方法中,您将启动所有 6 个任务的所有 Async 请求,然后对每个任务调用 join 函数得到结果

第二种方法:但是在第二种方法中,您在为每个任务旋转Async请求后立即调用join。例如,在调用 join 为任务 1 旋转 Async 线程后,请确保该线程完成任务,然后仅使用 异步线程

注意:另一方面,如果您清楚地观察输出,在第一种方法中,输出以随机顺序出现,因为所有六个任务都是异步执行的。但在第二种方法中,所有任务都是按顺序依次执行的。

相信您已经了解流map操作是如何执行的,或者您可以从 here 获取更多信息或here

To perform a computation, stream operations are composed into a stream pipeline. A stream pipeline consists of a source (which might be an array, a collection, a generator function, an I/O channel, etc), zero or more intermediate operations (which transform a stream into another stream, such as filter(Predicate)), and a terminal operation (which produces a result or side-effect, such as count() or forEach(Consumer)). Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.

关于java - 为什么 CompletableFuture 在单独的流中加入/获取比使用一个流更快,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61938810/

相关文章:

java - IntStream 的随机排列

java - 在 Java 8 中递归展平嵌套映射的值

java - 当伊甸园空间和幸存者空间都满了时会发生什么?

java - 如何使用坐标显示带有多个按钮的图像;左、上、右、下

java - .collect(Collectors.toList()) 和 Java 方法上的流

java - 如何根据Java 8可选的某些条件更新现有对象

java - 官方文档哪里说Java的并行流操作使用fork/join?

java - 向 Kafka Producer 发送数据

java - JAXB,为所有子元素设置 namespace

java - 如何定义自定义组合谓词?