java - 如何使用 invokeAll() 让所有线程池完成任务?

标签 java multithreading arraylist executorservice future

    ExecutorService pool=Executors.newFixedThreadPool(7);
        List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
        List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();

        for(int i=0;i<=diff;i++){

            String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);

            callList.add(new HotelCheapestFare(str));

        }       
     future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++){

        System.out.println("name is:"+future.get(i).get().getName());
    }

现在我希望池 invokeAll 在进入 for 循环之前执行所有任务,但是当我运行这个程序时,for 循环在 invokeAll 之前执行并抛出此异常:

java.util.concurrent.ExecutionException: java.lang.NullPointerException at 
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at  
java.util.concurrent.FutureTask.get(Unknown Source) at 
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:6‌​5)

Caused by: java.lang.NullPointerException at 
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheap‌estFare(HotelCheapes‌​tFare.java:166) 
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:1) 
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknow‌​n Source)
at java.lang.Thread.run

最佳答案

ExecutorService 的工作方式是,当您调用 invokeAll 时,它会等待所有任务完成:

Executes the given tasks, returning a list of Futures holding their status and results when all complete. Future.isDone() is true for each element of the returned list. Note that a completed task could have terminated either normally or by throwing an exception. The results of this method are undefined if the given collection is modified while this operation is in progress.1(emphasis added)

这意味着您的任务已全部完成,但有些任务可能引发了异常。此异常是 Future 的一部分 - 调用 get 会导致异常被重新抛出并包裹在 ExecutionException 中。

来自你的堆栈跟踪

java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at 
                                ^^^ <-- from get

您可以看到确实如此。您的一项任务因 NPE 而失败。 ExecutorService 捕获了异常,并在您调用 Future.get 时通过抛出 ExecutionException 来告诉您。

现在,如果您想在任务完成时执行任务,您需要一个 ExecutorCompletionService。这充当一个 BlockingQueue,允许您在任务完成时轮询任务。

public static void main(String[] args) throws Exception {
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 100; ++i) {
                try {
                    final Future<String> myValue = completionService.take();
                    //do stuff with the Future
                    final String result = myValue.get();
                    System.out.println(result);
                } catch (InterruptedException ex) {
                    return;
                } catch (ExecutionException ex) {
                    System.err.println("TASK FAILED");
                }
            }
        }
    });
    for (int i = 0; i < 100; ++i) {
        completionService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("FAILED");
                }
                return "SUCCESS";
            }
        });
    }
    executorService.shutdown();
}

在此示例中,我有一个任务在 ExecutorCompletionService 上调用 take,该任务在 Future 可用时获取它们,然后我提交ExecutorCompletionService 的任务。

这将允许您在失败时立即获取失败的任务,而不必等待所有任务一起失败或成功。

唯一的复杂之处是很难告诉轮询线程所有任务都已完成,因为现在一切都是异步的。在这种情况下,我使用了将提交 100 个任务的知识,因此它只需要轮询 100 次。更通用的方法是从 submit 方法中收集 Future,然后遍历它们以查看是否一切都已完成。

关于java - 如何使用 invokeAll() 让所有线程池完成任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18202388/

相关文章:

java - 当使用 ArrayList(Collection<? extends E> c) 作为复制构造函数时,我收到两个错误

java - 如何在简单的 Spring Boot 应用程序中 Autowiring 扩展 CrudRepository 的存储库?

java - Android Java 中的 BBCODE 到 HTML

python - 多线程和多处理线程池之间的区别?

java - 如何在 Java 中获取线程信息/统计信息

java - 在android中创建一个空的ArrayList

java - 为什么 useDelimiter 不工作 java

java - 重命名 Derby 架构

java - 海明码 : Number of parity bits

java - 线程如何知道前面有join方法