java - 从 ThreadPoolTask​​Executor 获取可调用或将 Runnable 转换为 Callable

标签 java multithreading spring threadpoolexecutor

我正在使用 ThreadPoolTaskExecutor用于执行我的任务,这些任务是 Callable 的实现界面。如果任务仍在池中(监控),我只想及时检查。怎么做?我知道我可以从 ThreadPoolExecutor 排队但是如何将 Runnable 转换为 Callable?

基本上我有这个可调用

public interface IFormatter extends Callable<Integer>{
    Long getOrderId();
}

我是这样执行的

ThreadPoolExecutor.submit(new Formatter(order));

最后我想在一些异步方法中循环遍历 ExecutorService 队列并检查带有 orderId 的线程是否仍然存在。

最佳答案

this answer 中所述, 你可以控制 FutureTask包装 Callable通过手动创建它并通过 execute 排队.否则,submit将包裹你的 Callable进入 ExecutorService -特定对象并将其放入队列中,从而无法查询 Callable 的属性通过标准 API。

使用自定义 FutureTask

class MyFutureTask extends FutureTask<Integer> {
    final IFormatter theCallable;

    public MyFutureTask(IFormatter callable) {
        super(callable);
        theCallable=callable;
    }
    Long getOrderId() {
        return theCallable.getOrderId();
    }
}

通过 threadPoolExecutor.execute(new MyFutureTask(new Formatter(order))); 将其排队,

您可以查询队列中的订单ID:

public static boolean isEnqueued(ThreadPoolExecutor e, Long id) {
    for(Object o: e.getQueue().toArray()) {
        if(o instanceof MyFutureTask && Objects.equals(((MyFutureTask)o).getOrderId(), id))
            return true;
    }
    return false;
}

这适用于任何 ExecutorService (假设它有一个队列)。如果您使用 ThreadPoolExecutor只是,您可以自定义其创建 FutureTask实例(从 Java 6 开始),而不是依赖提交者来做:

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, threadFactory);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, handler);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, threadFactory, handler);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if(callable instanceof IFormatter)
            return (FutureTask<T>)new MyFutureTask((IFormatter)callable);
        return super.newTaskFor(callable);
    }
}

然后,使用 MyThreadPoolExecutor 的实例而不是 ThreadPoolExecutor每次提交 IFormatter实例将使用 MyFutureTask 自动包装而不是标准 FutureTask .缺点是这只适用于这个特定的 ExecutorService并且泛型方法为特殊处理生成未经检查的警告。

关于java - 从 ThreadPoolTask​​Executor 获取可调用或将 Runnable 转换为 Callable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30789402/

相关文章:

c - 为什么调用 setjmp() 中的函数会发生段错误?

java - spring security 从数据库中获取用户 ID

java - 我需要用于 Eclipse 的 Tomcat 服务器吗

java - SwingUtilities 方法参数包含整个方法体?

wpf - 如何将 DispatcherTimer 设置到 for 循环中

spring - 如何为多个CacheManager创建多个CacheErrorHandler?

java - Spring MVC - 不支持请求方法 "POST"

java - 非法状态异常 : Content has been consumed - How to resolve?

java - 在 Java 中创建可下载文件并存储在 Web 应用程序子目录中

multithreading - 如何原子地使用一组句柄?