java - CompletableFutures 的 Future 上的 get 方法

标签 java multithreading future completable-future

我正在用java做一些多线程工作,我有一个关于 future 的问题get方法。

我有一个ExecutorSevice它提交 n Callable。提交返回 Future它被添加到列表中(稍后在一些代码中)。可调用的 call方法循环遍历列表,并对列表中的每个项目 someMethod被调用并返回 CompletableFuture<T> 。然后,Completable future 被添加到 map 中。

循环结束call返回包含一堆 CompletableFuture 的映射。

因此,初始列表 ( uglyList ) 包含 n 个 <String, CompletableFuture<String>> 的映射。

我的问题是,当我调用get时在 uglyList 的元素上执行将被阻止,直到整个列表(作为参数传递给 Callable)被访问并且所有 CompletableFutures已经插入到 map 中了,对吧?

因为,我有一个疑问,有没有可能get在此示例中,还等待 CompletableFutures 的完成在 map 上?我不知道如何测试这个


public class A {

    private class CallableC implements Callable<Map<String, CompletableFuture<String>>> {
        private List<String> listString;

        Map<String, CompletableFuture<String>> futureMap;

        public CallableC(List<String> listString) throws Exception {        
            this.listString = listString;
            this.futureMap = new HashMap<>();
        }

        @Override
        public Map<String, CompletableFuture<String>> call() throws Exception {

            for (int j = 0; j < listString.size(); ++j) {
                CompletableFuture<String> future = someMethod();        
                futureMap.put(listString.get(i), future);
            }

            return futureMap;
        }
    }


    public void multiThreadMethod(List<String> items) throws Exception {

        List<Future<Map<String, CompletableFuture<String>>>> uglyList = new ArrayList<>();

        int coresNumber = (Runtime.getRuntime().availableProcessors());

                // This method split a List in n subList
        List<List<String>> splittedList = split(items, coresNumber);

        ExecutorService executor = Executors.newFixedThreadPool(coresNumber);

        try {

            for (int i = 0; i < coresNumber; ++i) {
                threads.add(executor.submit(new CallableC(splittedList.get(i),)));
            }

            for (int i = 0; i < coresNumber; ++i) {
                uglyList.get(i).get(); //here
            }


        } catch (Exception e) {
                     // YAY errors
        }
        finally {
            executor.shutdown();
        }
    }
}

最佳答案

when i call get on a element of uglyList the execution is blocked until the entire list (passed as parameter to the Callable) is visited and all CompletableFutures have been inserted in the map, right?

是的,这是正确的。 get() 将等待 Callable 的完成,在您的情况下,这意味着 CallableC#call() 方法的结束,当 CompletableFutures 全部创建并插入到 futureMap 中时,将在 for 循环结束时到达该方法,无论这些 CompletableFuture 的状态/完成情况如何。

it is possible that the get in this example, also waits the completion of the CompletableFutures in the map?

可以实现这一点,但需要额外的代码。

在代码中执行此操作的最低方法是在任务内的 CompletableFuture 上添加 join 调用。

@Override
    public Map<String, CompletableFuture<String>> call() throws Exception {

        for (int j = 0; j < listString.size(); ++j) {
            CompletableFuture<String> future = someMethod();        
            futureMap.put(listString.get(j), future);
        }
        // This will wait for all the futures inside the map to terminate
        try {
            CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).join();
        } catch (CompletionException e) {
            // ignored - the status will be checked later on
        }

        return futureMap;
    }

另一种方法是在最后等待(为了简洁起见,我跳过异常处理):

for (int i = 0; i < coresNumber; ++i) {
    threads.add(executor.submit(new CallableC(splittedList.get(i),)));
}
// This will wait for all the futures to terminate (thanks to f.get())
// And then, all the completable futures will be added to an array of futures that will also be waited upon
CompletableFuture.allOf(
    uglyList.stream().flatMap(f -> f.get().values().stream()).toArray(CompletableFuture[]::new)
).join();

但是如果对你来说可能的话,我会仅使用可完成的 future 来简化整个代码,除非你有充分的理由以你的方式中断任务(使用列表列表、专用执行器等......)。这可能看起来像这样(可能有一些错别字)

List<String> items = ...
ConcurrentMap<String, String> resultsByItem = new ConcurrentHashMap<>();

Future<Void> allWork = CompletableFuture.allOf(
    items.stream()
        .map(item -> someWork(item) // Get a completable future for this item
                     .andThen(result -> resultsByItem.put(item, result)) // Append the result to the map
        ).toArray(CompletableFuture[]::new)
);
allWork.join();
// Your futureMap is completed.

这将替换问题中的整个代码。

关于java - CompletableFutures 的 Future 上的 get 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58729011/

相关文章:

android - 基本Android AsyncTask,执行doInBackground()时发生错误

c++ - 将来尝试引用已删除的函数

java - 如何在没有任何阻塞的情况下在 Java 8 中执行依赖任务

java - JSTL EL 条件错误

java - Maven 构建的泛型问题

java - 错误 : Could not find or load main class org. apache.hadoop.hdfs.server.datanode.DataNode

dart - 我什么时候应该使用 FutureBuilder?

javascript - JSP:提交前确认

Android守护线程测试

c - 使线程获得相等的时间片