java - 下面CompletableFuture示例中join的调用是否会阻塞进程

标签 java nonblocking completable-future

我试图理解 CompletableFutures 和返回已完成的 future 的调用链,我创建了下面的示例,它模拟了对数据库的两次调用。

第一个方法应该提供一个包含 userId 列表的完整 future,然后我需要调用另一个提供 userId 的方法来获取用户(在本例中为字符串)。

总结一下:

  1. 获取 ID
  2. 获取与这些 ID 对应的用户列表。

我创建了简单的方法来模拟 sleep 线程的响应。 请检查下面的代码

public class PipelineOfTasksExample {

    private Map<Long, String> db = new HashMap<>();

    PipelineOfTasksExample() {
        db.put(1L, "user1");
        db.put(2L, "user2");
        db.put(3L, "user3");
        db.put(4L, "user4");
    }


    private CompletableFuture<List<Long>> returnUserIdsFromDb() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("building the list of Ids" + " - thread: " + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(() -> Arrays.asList(1L, 2L, 3L, 4L));
    }

    private CompletableFuture<String> fetchById(Long id) {
        CompletableFuture<String> cfId = CompletableFuture.supplyAsync(() -> db.get(id));
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("fetching id: " + id + " -> " + db.get(id) + " thread: " + Thread.currentThread().getName());
        return cfId;
    }

    public static void main(String[] args) {

        PipelineOfTasksExample example = new PipelineOfTasksExample();

        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
                .thenCompose(listOfIds ->
                        CompletableFuture.supplyAsync(
                                () -> listOfIds.parallelStream()
                                        .map(id -> example.fetchById(id).join())
                                        .collect(Collectors.toList()
                                        )
                        )
                );

        System.out.println(result.join());
    }

}

我的问题是,连接调用 (example.fetchById(id).join()) 是否会破坏进程的非阻塞性质?如果答案是肯定的,我该如何解决这个问题?

提前谢谢

最佳答案

你的例子有点奇怪,因为你在任何操作开始之前都减慢了 returnUserIdsFromDb() 中的主线程,同样,fetchById 反而减慢了调用者的速度比异步操作更重要,这违背了异步操作的全部目的。

此外,您可以简单地使用 .thenApplyAsync(listOfIds -> ...),而不是 .thenCompose(listOfIds -> CompletableFuture.supplyAsync(() -> …)) .

所以更好的例子可能是

public class PipelineOfTasksExample {
    private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()
        .collect(Collectors.toMap(id -> id, id -> "user"+id));

    PipelineOfTasksExample() {}

    private static <T> T slowDown(String op, T result) {
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
        System.out.println(op + " -> " + result + " thread: "
            + Thread.currentThread().getName()+ ", "
            + POOL.getPoolSize() + " threads");
        return result;
    }
    private CompletableFuture<List<Long>> returnUserIdsFromDb() {
        System.out.println("trigger building the list of Ids - thread: "
            + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(
            () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),
            POOL);
    }
    private CompletableFuture<String> fetchById(Long id) {
        System.out.println("trigger fetching id: " + id + " thread: "
            + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(
            () -> slowDown("fetching id: " + id , db.get(id)), POOL);
    }

    static ForkJoinPool POOL = new ForkJoinPool(2);

    public static void main(String[] args) {
        PipelineOfTasksExample example = new PipelineOfTasksExample();
        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
            .thenApplyAsync(listOfIds ->
                listOfIds.parallelStream()
                    .map(id -> example.fetchById(id).join())
                    .collect(Collectors.toList()
                ),
                POOL
            );
        System.out.println(result.join());
    }
}

打印类似的内容

trigger building the list of Ids - thread: main
building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads
trigger fetching id: 2 thread: ForkJoinPool-1-worker-0
trigger fetching id: 3 thread: ForkJoinPool-1-worker-1
trigger fetching id: 4 thread: ForkJoinPool-1-worker-2
fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-3, 4 threads
fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-3, 4 threads
fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-2, 4 threads
trigger fetching id: 1 thread: ForkJoinPool-1-worker-3
fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-2, 4 threads
[user1, user2, user3, user4]

乍一看,线程数量可能令人惊讶。

答案是join()可能会阻塞线程,但如果这种情况发生在Fork/Join池的工作线程内,这种情况将被检测到并启动一个新的补偿线程,保证配置的目标并行度。

作为一种特殊情况,当使用默认的 Fork/Join 池时,实现可能会在 join() 方法中拾取新的挂起任务,以确保同一线程内的进度。

因此,代码总是会取得进展,如果替代方案要复杂得多,偶尔调用 join() 也没有什么问题,但如果过度使用,则存在资源消耗过多的危险。毕竟,使用线程池的原因是为了限制线程数量。

另一种方法是尽可能使用链式依赖操作。

public class PipelineOfTasksExample {
    private final Map<Long, String> db = LongStream.rangeClosed(1, 4).boxed()
        .collect(Collectors.toMap(id -> id, id -> "user"+id));

    PipelineOfTasksExample() {}

    private static <T> T slowDown(String op, T result) {
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
        System.out.println(op + " -> " + result + " thread: "
            + Thread.currentThread().getName()+ ", "
            + POOL.getPoolSize() + " threads");
        return result;
    }
    private CompletableFuture<List<Long>> returnUserIdsFromDb() {
        System.out.println("trigger building the list of Ids - thread: "
            + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(
            () -> slowDown("building the list of Ids", Arrays.asList(1L, 2L, 3L, 4L)),
            POOL);
    }
    private CompletableFuture<String> fetchById(Long id) {
        System.out.println("trigger fetching id: " + id + " thread: "
            + Thread.currentThread().getName());
        return CompletableFuture.supplyAsync(
            () -> slowDown("fetching id: " + id , db.get(id)), POOL);
    }

    static ForkJoinPool POOL = new ForkJoinPool(2);

    public static void main(String[] args) {
        PipelineOfTasksExample example = new PipelineOfTasksExample();

        CompletableFuture<List<String>> result = example.returnUserIdsFromDb()
            .thenComposeAsync(listOfIds -> {
                List<CompletableFuture<String>> jobs = listOfIds.parallelStream()
                    .map(id -> example.fetchById(id))
                    .collect(Collectors.toList());
                return CompletableFuture.allOf(jobs.toArray(new CompletableFuture<?>[0]))
                    .thenApply(_void -> jobs.stream()
                        .map(CompletableFuture::join).collect(Collectors.toList()));
                },
                POOL
            );

        System.out.println(result.join());
        System.out.println(ForkJoinPool.commonPool().getPoolSize());
    }
}

不同之处在于,首先,所有异步作业都会提交,然后,调度对其调用 join 的依赖操作,该操作仅在所有作业完成时才执行,因此这些 join 调用永远不会阻塞。只有 main 方法末尾的最终 join 调用可能会阻塞主线程。

所以这会打印出类似的内容

trigger building the list of Ids - thread: main
building the list of Ids -> [1, 2, 3, 4] thread: ForkJoinPool-1-worker-1, 1 threads
trigger fetching id: 3 thread: ForkJoinPool-1-worker-1
trigger fetching id: 2 thread: ForkJoinPool-1-worker-0
trigger fetching id: 4 thread: ForkJoinPool-1-worker-1
trigger fetching id: 1 thread: ForkJoinPool-1-worker-0
fetching id: 4 -> user4 thread: ForkJoinPool-1-worker-1, 2 threads
fetching id: 3 -> user3 thread: ForkJoinPool-1-worker-0, 2 threads
fetching id: 2 -> user2 thread: ForkJoinPool-1-worker-1, 2 threads
fetching id: 1 -> user1 thread: ForkJoinPool-1-worker-0, 2 threads
[user1, user2, user3, user4]

表明不必创建补偿线程,因此线程数与配置的目标并行度匹配。

请注意,如果实际工作是在后台线程中完成的,而不是在 fetchById 方法本身中完成的,那么您现在不再需要并行流,因为没有阻塞 join () 调用。对于此类场景,仅使用 stream() 通常会带来更高的性能。

关于java - 下面CompletableFuture示例中join的调用是否会阻塞进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56413349/

相关文章:

java - 并行流和 CompletableFuture 的区别

java - 为什么 CompletableFuture allOf 方法会进行二分查找?

java.text.Collat​​or 用于字符串比较。,

java - 对象输入流类强制转换异常 - 即使它不应该发生

c - 使用 Winsock 接收数据

当 SSL_read() 返回 WANT_READ 时调用 SSL_write()

java - CompletableFuture.runAsync 吞没异常

java - 将 Twirl 与 Spring MVC 结合使用

java - Java 中的事件(监听器)

Java Linux 非阻塞套接字超时行为