java - 在 Java Future 上使用 isDone 和 Cancel 轮询而不是阻塞 get

标签 java multithreading future

我正在处理一些无法立即重构的遗留代码。

该代码使用阻塞式 Java Future。它使用 future.get(withTimeOut,...)。这意味着我们需要有一个合适大小的线程池才能足够响应。因为调用将被阻止,直到他们完成或超时。

问题: 我正在考虑捕获 future 并将其放入一个数据结构中,该数据结构将意识到任务执行的开始。然后有一个专用的线程或池,它将遍历数据结构并检查 future.isDone 或是否已超过超时限制。如果是,它可以获取结果或取消执行。这样就不需要很多线程。它是正确的实现还是根本不推荐?

提前致谢。

编辑:

只是为了提供更多上下文。这些线程用于记录下游服务。我们真的不关心响应,但我们不希望连接挂起。因此,我们需要捕获 future 并确保它被取消或超时。

这是我在问完问题后写的一个基本模拟。

@Component
public class PollingService {

    private ExecutorService executorService = Executors.newFixedThreadPool(1);
    PoorMultiplexer poorMultiplexer = new PoorMultiplexer();
    private ConcurrentMap<Integer, Map<Future, Long>> futures = new ConcurrentHashMap<>();

    public void startHandler(){
        Thread handler = new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        //This should be handled better. If there is not anything stop and re-start it later.
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    for(Iterator<ConcurrentMap.Entry<Integer, Map<Future, Long>>> it = futures.entrySet().iterator(); it.hasNext();){
                        ConcurrentMap.Entry<Integer, Map<Future, Long>> entry = it.next();
                        Map<Future, Long> futureMap = entry.getValue();
                        boolean isProcessed = false;
                        if(futureMap.keySet().iterator().next().isDone()){
                            //mark completed
                            isProcessed = true;
                        }

                        if(futureMap.values().iterator().next() < (300 + System.currentTimeMillis()) && !isProcessed){
                            //cancel
                            futureMap.keySet().iterator().next().cancel(true);
                            isProcessed = true;
                        }

                        if(isProcessed){
                            futures.remove(entry.getKey());
                            System.out.println("Completed : " + entry.getKey());
                        }

                    }

                    System.out.println("Run completed");
                }
            }
        });

        handler.start();
    }

    public void run(int i) throws InterruptedException, ExecutionException{
        System.out.println("Starting : " + i);

        poorMultiplexer.send(new Runnable() {
            @Override
            public void run() {
                long startTime = System.currentTimeMillis();
                Future future = poorMultiplexer.send(execute());

                Map<Future, Long> entry = new HashMap<>();
                entry.put(future, startTime);
                futures.put(i, entry);
                System.out.println("Added : " + i);
            }
        });
    }

    public void stop(){
        executorService.shutdown();
    }

    public Runnable execute(){
        Worker worker = new Worker();
        return worker;
    }
}


//This is a placeholder for a framework
class PoorMultiplexer {
    private ExecutorService executorService = Executors.newFixedThreadPool(20);

    public Future send(Runnable task){
        return executorService.submit(task);
    }
}


class Worker implements Runnable{

    @Override
    public void run() {
        //service call here
    }

}

最佳答案

使用单独的线程异步轮询一组 Futures 对我来说确实是一个合理的实现。也就是说,如果您能够添加库依赖项,您可能会发现切换到 Guava 的 ListenableFuture 会更容易。 ,因为 Guava 提供了大量用于执行异步工作的实用程序。

关于java - 在 Java Future 上使用 isDone 和 Cancel 轮询而不是阻塞 get,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37908436/

相关文章:

c# - 后台 worker 调用之间是否保留了线程本地存储?

c - 当 `func`结束时线程会自动退出吗?

c++ - 使用 "futures"或类似范例开发 C++ 并发库

java - 使用 future 和 completableFuture 中断读取方法

java - 带有涉及两个 DAO 的遗留代码的 Spring 事务

java - 使用 Spring Boot + Spring Security 提供静态资源

java - 如何用java提取字符串中的特定字母

SQL 注释中的 Java Regex 查找/替换模式

javascript - 如何同时发布多个axios请求?

Scala - 元组上的 Future.sequence