java - future.isDone 即使任务完成也会返回 false

标签 java multithreading future

我遇到了棘手的情况,即使线程已完成,future.isDone() 是否返回 false

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DataAccessor {
    private static ThreadPoolExecutor executor;
    private int timeout = 100000;
    static {
        executor = new ThreadPoolExecutor(10, 10, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
    }

    public static void main(String[] args) {
        List<String> requests = new ArrayList<String>();
        for(int i=0; i<20; i++){
            requests.add("request:"+i);
        }
        DataAccessor dataAccessor = new DataAccessor();

        List<ProcessedResponse> results = dataAccessor.getDataFromService(requests);
        for(ProcessedResponse response:results){
            System.out.println("response"+response.toString()+"\n");
        }
        executor.shutdown();
    }

    public List<ProcessedResponse> getDataFromService(List<String> requests) {
        final CountDownLatch latch = new CountDownLatch(requests.size());
        List<SubmittedJob> submittedJobs = new ArrayList<SubmittedJob>(requests.size());
        for (String request : requests) {
            Future<ProcessedResponse> future = executor.submit(new GetAndProcessResponse(request, latch));
            submittedJobs.add(new SubmittedJob(future, request));
        }
        try {
            if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
                // some of the jobs not done
                System.out.println("some jobs not done");
            }
        } catch (InterruptedException e1) {
            // take care, or cleanup
            for (SubmittedJob job : submittedJobs) {
                job.getFuture().cancel(true);
            }
        }
        List<ProcessedResponse> results = new LinkedList<DataAccessor.ProcessedResponse>();
        for (SubmittedJob job : submittedJobs) {
            try {
                // before doing a get you may check if it is done
                if (!job.getFuture().isDone()) {
                    // cancel job and continue with others
                    job.getFuture().cancel(true);
                    continue;
                }
                ProcessedResponse response = job.getFuture().get();
                results.add(response);
            } catch (ExecutionException cause) {
                // exceptions occurred during execution, in any
            } catch (InterruptedException e) {
                // take care
            }
        }
        return results;
    }

    private class SubmittedJob {
        final String request;
        final Future<ProcessedResponse> future;

        public Future<ProcessedResponse> getFuture() {
            return future;
        }

        public String getRequest() {
            return request;
        }

        SubmittedJob(final Future<ProcessedResponse> job, final String request) {
            this.future = job;
            this.request = request;
        }
    }

    private class ProcessedResponse {
        private final String request;
        private final String response;

        ProcessedResponse(final String request, final String response) {
            this.request = request;
            this.response = response;
        }

        public String getRequest() {
            return request;
        }

        public String getResponse() {
            return response;
        }

        public String toString(){
            return "[request:"+request+","+"response:"+ response+"]";
        }
    }

    private class GetAndProcessResponse implements Callable<ProcessedResponse> {
        private final String request;
        private final CountDownLatch countDownLatch;

        GetAndProcessResponse(final String request, final CountDownLatch countDownLatch) {
            this.request = request;
            this.countDownLatch = countDownLatch;
        }

        public ProcessedResponse call() {
            try {
                return getAndProcessResponse(this.request);
            } finally {
                countDownLatch.countDown();
            }
        }

        private ProcessedResponse getAndProcessResponse(final String request) {
            // do the service call
            // ........
            if("request:16".equals(request)){
                throw (new RuntimeException("runtime"));
            }
            return (new ProcessedResponse(request, "response.of." + request));
        }
    }
}

如果我调用 future.isDone() 它返回 false 虽然 coundownLatch.await() 返回 true。任何的想法?还要注意,发生这种情况时,countDownLatch.await 会立即出现。

如果您发现格式不可读,请在此处查看,http://tinyurl.com/7j6cvep .

最佳答案

问题很可能是时机问题。锁存器将在关于 Future 的所有任务实际完成之前释放(因为 countDown() 调用在 call() 方法中)。

你基本上是在重新创作 CompletionService 的作品(实现是 ExecutorCompletionService ),我建议改用它。你可以使用 poll(timeout)获取结果的方法。只需跟踪总时间并确保将每次调用的超时时间减少到总剩余时间。

关于java - future.isDone 即使任务完成也会返回 false,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9604713/

相关文章:

java - 为什么 glGenTextures 在 OpenGL 上下文线程 (Android) 上返回零?

stream - 如何在 Rust 中的 future 和流之间进行选择?

scala - Play 2.2 -Scala - 如何在 Controller Action 中链接 future

java - 图像在处理中使用过多的 RAM

java - 在 Java 中同步多个异步请求

c++ - 我可以在没有互斥量的线程中读取 bool 变量吗?

scala - 如何在 Scala 中实现 Future as Applicative?

java - 包含二维双数组用户输入的对象

java - 如何在 Firebase 中使用推送通知关闭 Activity ?

java - 页面刷新时重新创建数据