java - 收集所有 CompletableFuture.allOf 的可抛出对象

标签 java asynchronous promise java-8

疯狂的事情正在发生。我有一批并行异步任务,每个任务都可能抛出异常。我想执行所有这些,并收集所有可能的异常,并仅用一个异常将其包装起来。不过看起来收集了Throwable对象可能会被删除(!?),就像它被弱引用引用一样。最好通过示例来解释这一点。

示例

这是示例的完整源代码。我将在这里将其分为逻辑部分,以更好地解释我想要做的事情。如果您想尝试,只需将以下所有代码块合并到您的 IDE 中即可。

主要

我们称之为process()并传递输入数据 - 将要处理的整数数组。我们检查异常情况,然后列出所有收集异常情况。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class CollectExceptions {

    public static void main(String[] args) throws Exception {
        CompletableFuture<List<Integer>> futures =
                process(Arrays.asList("1", "a", "b", "4"));

        try {
            futures.get();
        } catch (ExecutionException e) {
            BatchException batchException = (BatchException) e.getCause();
            Stream.of(batchException.exceptions).forEach(System.out::println);
        }
    }

流程

首先,我们转换输入IterableCompletableFuture<Integer>[] 。这意味着对单个输入的每个工作都会传递到 supplyAsync 。由于我们想要收集错误,因此我们使用 handle方法。请注意,exeptionally() 也会发生同样的情况。和whenComplete() .

然后我们编写一个CompletableFuture所有输入 future 。当该完成时,即处理所有输入时,我们决定应该抛出异常;如果收集到任何异常。

public static CompletableFuture<List<Integer>> process(Iterable<String> documents) {
    final List<Throwable> throwables = new ArrayList<>();

    CompletableFuture<Integer>[] allFuturesArray = StreamSupport
        .stream(documents.spliterator(), false)
        .map((document) -> CompletableFuture.supplyAsync(() -> workSync(document)).handle((list, t) -> {
            if (t != null) {
                throwables.add(t);
            }
            return list;
        }))
        .toArray(CompletableFuture[]::new);

    CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(allFuturesArray);

    return allDoneFuture
        .thenApply(v -> Stream.of(allFuturesArray)
            .map(CompletableFuture::join)
            .collect(Collectors.<Integer>toList())
        )
        .whenComplete((list, throwable) -> {
            if (throwables.size() > 0) {
                throw new BatchException(throwables);
            }
        });
}

工作

这是工作方法。我们添加了一些延迟,否则您不会看到问题。还有 BatchException 的定义只是存储输入异常。

    public static Integer workSync(String string) {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Integer.valueOf(string);
    }

    public static class BatchException extends RuntimeException {
        public final Throwable[] exceptions;
        public BatchException(Iterable<Throwable> exceptions) {
            this.exceptions = StreamSupport
                .stream(exceptions.spliterator(), false)
                .toArray(Throwable[]::new);
        }
    } 
}

执行和问题

程序行为不一致!当我从 IDE 运行它时(在 Debug模式下),我看到有 2 个异常,但有时其中一个异常是 null !!!

这是一个输出:

/Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home/bin/java....
null
java.util.concurrent.CompletionException: java.lang.NumberFormatException: For input string: "a"
done

哇!搞什么?我们怎么有null集???看起来里面正在进行一些不安全的工作。

最佳答案

“看起来内部正在进行一些不安全的工作。”

是的...您的 throwables 列表访问不同步:)

关于java - 收集所有 CompletableFuture.allOf 的可抛出对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27842913/

相关文章:

javascript - 应用程序重新加载时的 Angular 解析

java - 使用反射可以使集合(列表)的大小小于 0 吗?

java - RichFaces 丰富 :panelMenu from RF demo causes error

java - Android Radio Group 多选问题

java - 蒙戈集合 : How to get value of nested key

javascript - 异步 waterfall : Callback already was called?

javascript - $.when.apply($, someArray) 是做什么的?

c# - 如何在 C# 中将元素异步添加到 Queue<T>?

Node.js - 如何使用回调调用异步函数?

javascript - 我可以依赖非链式后续 promise 订单吗?