java - 如何终止多线程中超时的任务?

标签 java multithreading executorservice callable futuretask

我需要创建一个库,其中包含同步和异步方法。

  • executeSynchronous() - 等待直到获得结果,然后返回结果。
  • executeAsynchronous() - 立即返回一个 Future,如果需要,可以在其他事情完成后进行处理。

我的库的核心逻辑

客户将使用我们的库,他们将通过传递 DataKey 构建器对象来调用它。然后,我们将使用该 DataKey 对象构造一个 URL,并通过执行它来对该 URL 进行 HTTP 客户端调用,在我们以 JSON 字符串形式返回响应后,我们将该 JSON 字符串发送回我们的客户通过创建 DataResponse 对象来实现。有些客户会调用 executeSynchronous(),有些客户可能会调用 executeAsynchronous(),所以这就是为什么我需要在我的库中单独提供两个方法。

界面:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}

然后我的 DataClient 实现了上述 Client 接口(interface):

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        Future<DataResponse> future = null;

        try {
            future = executeAsynchronous(key);
            dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
            // does this look right the way I am doing it?
            future.cancel(true); // terminating tasks that have timed out.
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey key) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(key, restTemplate);
            future = executor.submit(task); 
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
        }

        return future;
    }
}

将执行实际任务的简单类:

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
}

问题陈述:-

当我开始研究此解决方案时,我并没有终止已超时的任务。我向客户端报告超时,但任务继续在线程池中运行(可能长时间占用我有限的 10 个线程之一)。所以我在网上做了一些研究,发现我可以通过在 future 上使用 cancel 来取消那些超时的任务,如下所示 -

future.cancel(true);

但我想确定,我在 executeSynchronous 方法中取消超时任务的方式看起来是否正确?

因为我在Future上调用cancel(),如果任务仍在队列中,这将阻止它运行,所以我不确定我在做什么对还是不对?执行此操作的正确方法是什么?

如果有更好的方法,那么有人可以提供一个例子吗?

最佳答案

如果任务仍在队列中,则只需调用 future.cancel() 即可取消它,但显然您不知道该任务是否在队列中。此外,即使您要求 future 中断任务,它也可能无法工作,因为您的任务仍然可以执行忽略线程中断状态的操作。

因此您可以使用 future.cancel(true) 但您需要确保您的任务(线程)确实考虑线程中断状态。例如,正如您提到的,您进行 http 调用,因此您可能需要在线程中断后立即关闭 http 客户端资源。

请引用下面的例子。

我尝试实现任务取消场景。通常,线程可以检查 isInterrupted() 并尝试终止自身。但是,当您使用可调用的线程池执行器并且任务实际上并不像 while(!Thread.isInterrupted()) {//执行任务} 时,这会变得更加复杂。

在此示例中,任务正在写入文件(为了保持简单,我没有使用 http 调用)。线程池执行器开始运行任务,但调用者希望在 100 毫秒后取消它。现在 future 向线程发送中断信号,但可调用任务在写入文件时无法立即检查它。因此,为了实现这一点,callable 会维护一个它将要使用的 IO 资源列表,一旦将来想要取消该任务,它只需对所有 IO 资源调用 cancel() 即可以 IOException 终止该任务然后线程完成。

public class CancellableTaskTest {

    public static void main(String[] args) throws Exception {
        CancellableThreadPoolExecutor threadPoolExecutor = new CancellableThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        long startTime = System.currentTimeMillis();
        Future<String> future = threadPoolExecutor.submit(new CancellableTask());
        while (System.currentTimeMillis() - startTime < 100) {
            Thread.sleep(10);
        }
        System.out.println("Trying to cancel task");
        future.cancel(true);
    }
}

class CancellableThreadPoolExecutor extends ThreadPoolExecutor {

    public CancellableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new CancellableFutureTask<T>(callable);
    }
}

class CancellableFutureTask<V> extends FutureTask<V> {

    private WeakReference<CancellableTask> weakReference;

    public CancellableFutureTask(Callable<V> callable) {
        super(callable);
        if (callable instanceof CancellableTask) {
            this.weakReference = new WeakReference<CancellableTask>((CancellableTask) callable);
        }
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean result = super.cancel(mayInterruptIfRunning);
        if (weakReference != null) {
            CancellableTask task = weakReference.get();
            if (task != null) {
                try {
                    task.cancel();
                } catch (Exception e) {
                    e.printStackTrace();
                    result = false;
                }
            }
        }
        return result;
    }
}

class CancellableTask implements Callable<String> {

    private volatile boolean cancelled;
    private final Object lock = new Object();
    private LinkedList<Object> cancellableResources = new LinkedList<Object>();

    @Override
    public String call() throws Exception {
        if (!cancelled) {
            System.out.println("Task started");
            // write file
            File file = File.createTempFile("testfile", ".txt");
            BufferedWriter writer = new BufferedWriter(new FileWriter(file));
            synchronized (lock) {
                cancellableResources.add(writer);
            }
            try {
                long lineCount = 0;
                while (lineCount++ < 100000000) {
                    writer.write("This is a test text at line: " + lineCount);
                    writer.newLine();
                }
                System.out.println("Task completed");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                writer.close();
                file.delete();
                synchronized (lock) {
                    cancellableResources.clear();
                }
            }
        }
        return "done";
    }

    public void cancel() throws Exception {
        cancelled = true;
        Thread.sleep(1000);
        boolean success = false;
        synchronized (lock) {
            for (Object cancellableResource : cancellableResources) {
                if (cancellableResource instanceof Closeable) {
                    ((Closeable) cancellableResource).close();
                    success = true;
                }
            }
        }
        System.out.println("Task " + (success ? "cancelled" : "could not be cancelled. It might have completed or not started at all"));
    }
}

对于您的 REST Http 客户端相关要求,您可以修改工厂类,如下所示 -

public class CancellableSimpleClientHttpRequestFactory extends SimpleClientHttpRequestFactory {

    private List<Object> cancellableResources;

    public CancellableSimpleClientHttpRequestFactory() {
    }

    public CancellableSimpleClientHttpRequestFactory(List<Object> cancellableResources) {
        this.cancellableResources = cancellableResources;
    }

    protected HttpURLConnection openConnection(URL url, Proxy proxy) throws IOException {
        HttpURLConnection connection = super.openConnection(url, proxy);
        if (cancellableResources != null) {
            cancellableResources.add(connection);
        }
        return connection;
    }
}

在这里,您需要在可运行任务中创建 RestTemplate 时使用此工厂。

    RestTemplate template = new RestTemplate(new CancellableSimpleClientHttpRequestFactory(this.cancellableResources));

确保您传递的可取消资源列表与您在 CancellableTask 中维护的列表相同。

现在您需要像这样修改 CancellableTask 中的 cancel() 方法 -

synchronized (lock) {
    for (Object cancellableResource : cancellableResources) {
        if (cancellableResource instanceof HttpURLConnection) {
            ((HttpURLConnection) cancellableResource).disconnect();
            success = true;
        }
    }
}

关于java - 如何终止多线程中超时的任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29130563/

相关文章:

multithreading - Spring data Couchbase生成的查询方法不是线程安全的

java - Kotlin 的 Flyway 迁移失败

java - Json 映射异常无法从 START_ARRAY token 中反序列化实例

Java 指令重新排序示例不起作用

java - ExecutorCompletionService 缺少 invokeAll 接口(interface)

java - 在长时间运行的应用程序中运行并行任务

JAVA - ScheduledExecutorService 不退出线程

java - Facelets 在哪里?

java - 从列表中删除多余的文件夹路径

c - Pthread_join() 卡在线程数组上