Java ExecutorService - 任务/可调用不取消/中断

标签 java android multithreading threadpool executorservice

我正在使用 Java ExecutorService (ThreadPool) 来执行任务并更新 UI,同时特定 Activity 位于前台(可见)。

问题: 我想要的是,当用户切换到另一个 Activity 时,我想停止/取消所有任务(无论是排队还是正在运行)。为此,我必须在调用 isDone() 检查 Future 对象状态后,对 ExecutorService 提交方法返回的 Future 对象使用 ExecutorService shutdown/shutdownNow 方法或 cancel(true) 。这会将相应的中断线程标志设置为 TRUE,我必须在可调用实现中检查(Thread.currentThread.isInterrupted())以确定是否被中断退出任务/线程。问题是我是否调用 ExecutorService shutdown 方法或 Future cancel(true) 方法,在这两种情况下,很少有 10 次中有 1 次它将线程中断标志设置为 TRUE,这最终导致内存泄漏等。

代码:

ThreadPool Singleton实现(cancelAll-取消任务& shutdownExecutor-关闭ExecutorService):

private static class ThreadPoolManager {

    private ExecutorService executorService;
    private List<Future> queuedFutures;
    private BlockingQueue<Runnable> blockingQueue;

    private static ThreadPoolManager instance;

    private ThreadPoolManager() {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
        queuedFutures = new ArrayList<>();
        blockingQueue = new LinkedBlockingDeque<>();
        executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
    }

    static {
        instance = new ThreadPoolManager();
    }

    public static void submitItemTest(Callable<Object> callable) {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
        if(instance.executorService.isShutdown()){
            instance=new ThreadPoolManager();
        }
        Future future = instance.executorService.submit(callable);
        instance.queuedFutures.add(future);
    }

    public static void submitTestAll(Callable<Object> callable) {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
        if(instance.executorService.isShutdown()){
            instance=new ThreadPoolManager();
        }
        cancelAll();
        Future future = instance.executorService.submit(callable);
        instance.queuedFutures.add(future);
    }

    public static void cancelAll() {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
        instance.blockingQueue.clear();
        for (Future future : instance.queuedFutures) {
            if (!future.isDone()) {
                boolean cancelled = future.cancel(true);
                MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
            }
        }
        instance.queuedFutures.clear();
    }

    public static void shutdownExecutor(){
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
        instance.executorService.shutdownNow();
    }
}

可调用实现(正常迭代和检查中断的 if 子句):

private Callable<Object> getTestAllCallable() {
        return new Callable<Object>() {
            @Override
            public Object call() {
                for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                    if (!Thread.currentThread().isInterrupted()) {
                          //someWork

                    } else {
                        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
                        return null;
                    }
                }
                return null;
            }
        };
    }

Activity/Fragment onStop 实现(用于调用取消任务和关闭):

@Override
public void onStop() {
    MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called");
    ThreadPoolManager.cancelAll();
    ThreadPoolManager.shutdownExecutor();
    super.onStop();
}

更新:

所做的更改:

  1. 不再使用 Runnable 而不是 Callable。

  2. 现在不再对 ExecutorService 使用单例。

      private class ThreadPoolManager {
    
        private ExecutorService executorService;
        private List<Future> queuedFutures;
        private BlockingQueue<Runnable> blockingQueue;
    
        private ThreadPoolManager() {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
            queuedFutures = new ArrayList<>();
            blockingQueue = new LinkedBlockingDeque<>();
            executorService =getNewExecutorService();
        }
    
        private ExecutorService getNewExecutorService(){
            return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
        }
    
        private void submitItemTest(Runnable runnable) {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
            if(executorService.isShutdown()){
                executorService=getNewExecutorService();
            }
            Future future = executorService.submit(runnable);
            queuedFutures.add(future);
        }
    
        private void submitTestAll(Runnable runnable) {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
            if(executorService.isShutdown()){
                executorService=getNewExecutorService();
            }
            cancelAll();
            Future future = executorService.submit(runnable);
            queuedFutures.add(future);
        }
    
        private void cancelAll() {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
            blockingQueue.clear();
            for (Future future : queuedFutures) {
                if (!future.isDone()) {
                    boolean cancelled = future.cancel(true);
                    MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
                }
            }
            queuedFutures.clear();
        }
    
        private void shutdownExecutor(){
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
            executorService.shutdownNow();
            blockingQueue.clear();
            queuedFutures.clear();
        }
    }
    

找到了罪魁祸首,但还没有找到解决方案。以下 2 是 Runnables 1 的实现,其中 1 正在运行(isInterrupted 返回 true 或出现 InterupptedException,然后任务结束),但其他则不运行。

工作可运行(我用它来测试):

new Runnable() {
          @Override
          public void run() {
                    int i=0;
                    while(!Thread.currentThread().isInterrupted()){
                        try {
                            System.out.println(i);
                            Thread.currentThread().sleep(2000);
                        } catch (InterruptedException e) {
                            MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted");
                            return;
                        }
                        i++;
                    }
                }
            }

不工作(我想使用的实际代码):

new Runnable(){
            @Override
            public void run() {
                for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                    if (!Thread.currentThread().isInterrupted()) {

                    } else {
                        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)");
                        break;
                    }
                }
            }
        };

1 个可能的解决方案是使用变量( boolean 值)作为可运行对象内的中断标志,我将其视为最后的手段,但很乐意了解错误。

最佳答案

根据 ExecutorService 文档,关闭正在执行的任务是尽最大努力完成的。

因此,当您调用 ExecutorService.shutdownNow() 时,实现将尝试关闭所有当前正在执行的任务。每个任务将继续运行,直到它检测到它被中断为止。

为了确保您的线程在早期阶段达到该点,最好在循环中添加检查线程是否被中断,如下所示:

Thread.currentThread().isInterrupted();

通过在每次迭代时进行此调用,您的线程将在距实际中断很短的间隔内检测到中断。

因此,修改后的 Callable 代码将如下所示:

private Callable<Object> getTestAllCallable() {
    return new Callable<Object>() {
        @Override
        public Object call() {
            for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                if(Thread.currentThread().isInterrupted()) {
                    return null;
                }
                if(someCondition) {
                    //someWork
                } else {
                    MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
                    return null;
                }
            }
            return null;
        }
    };
}

顺便说一句,如果您不打算从 call() 方法返回任何值,则使用 Callable 是没有意义的。如果您的任务中需要参数化类型,只需创建一个参数化 Runnable 即可,如下所示:

public class ParameterizedRunnable<T> implements Runnable {
    private final T t;

    public ParameterizedRunnable(T t) {
        this.t = t;
    }

    public void run() {
        //do some work here
    }
}

关于Java ExecutorService - 任务/可调用不取消/中断,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45390151/

相关文章:

java - Spring Security — 'Full authentication is required to access this resource' 表示不存在的端点

android - 如何通过jenkins运行appium测试脚本

android - Intellij IDEA 中自动切换到替换模式

android - 如何在android中对齐 toast

c++ - 增加进程的内存使用

java - 通过 TFTP 传输的文件与主机上传输的文件大小不同

java - 当不同条目缺少少量属性时,如何将 xml 解析为 HashMap

JAVA - 如何确保在执行器开始阻塞 UI 线程之前隐藏节点

java - 在 JPA QL 中,如何按项目类别选择所有国家/地区?

java - Volatile:为什么要阻止编译器重新排序代码