java - 线程池执行器具有优先任务并避免饥饿

标签 java multithreading threadpool

对于我的用例,我需要一个可以根据优先级执行任务的执行器。实现此目的的简单方法是使用带有 PriorityBlockingQueue 的线程池并重写 newTaskFor() 以返回根据任务优先级进行比较的自定义 future 任务。

//Define priorities
public enum Priority {
    HIGH, MEDIUM, LOW, VERYLOW;
}

优先任务

//A Callable tasks that has priority. Concrete implementation will implement 
//call() to do actual work and getPriority() to return priority
public abstract class PriorityTask<V> implements Callable<V> {
    public abstract Priority getPriority ();
}

实际执行器实现

public class PriorityTaskThreadPoolExecutor <V> {
    int _poolSize;
    private PriorityBlockingQueue<Runnable> _poolQueue = 
                                       new PriorityBlockingQueue<Runnable>(500); 
    private ThreadPoolExecutor _pool;

    public PriorityTaskThreadPoolExecutor (int poolSize) {
        _poolSize = poolSize;

        _pool = new ThreadPoolExecutor(_poolSize, _poolSize, 5, TimeUnit.MINUTES, 
                                      _poolQueue) {
                        //Override newTaskFor() to return wrap PriorityTask 
                        //with a PriorityFutureTaskWrapper.
                        @Override
                        protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
                            return new PriorityFutureTaskWrapper<V>((PriorityTask<V>) c);
                        }
                };

        _pool.allowCoreThreadTimeOut(true);
    }

    public Future<V> submit (PriorityTask<V> task) {
        return _pool.submit(task);
    }

}

//A future task that wraps around the priority task to be used in the queue
class PriorityFutureTaskWrapper<V> extends FutureTask<V> 
                             implements Comparable <PriorityFutureTaskWrapper<V>> {
    PriorityTask<V> _priorityTask;

    public PriorityFutureTaskWrapper (PriorityTask<V> priorityTask) {
        super(priorityTask);
        _priorityTask = priorityTask;
    }

    public PriorityTask<V> getPriorityTask () {
        return _priorityTask;
    }

    @Override
    public int compareTo(PriorityFutureTaskWrapper<V> o) {
        return _priorityTask.getPriority().ordinal() - 
               o.getPriorityTask().getPriority().ordinal();
    }
}

问题是,在我的用例中,低优先级任务可能会永远饥饿。我想避免这种情况。我找不到使用 java 中可用的执行器/池来执行此操作的干净方法。所以我正在考虑编写自己的执行器。我有两种不同的方法。

1) 带有 PriorityBlockingQueue 的自定义线程池。将有一个单独的线程,检查队列中的任务年龄。较旧的任务将被删除并以更高的优先级重新添加。

2) 我的用例只有有限数量的优先级,例如 1-4。我将为每个优先级设置 4 个不同的队列。现在,自定义池中的线程在必须执行下一个任务时将按以下顺序扫描队列,而不是阻塞在队列上。

40% 线程 - 第一季度、第二季度、第三季度、第四季度

30% 线程 - 第二季度、第一季度、第三季度、第四季度

20% 线程 - 第三季度、第一季度、第二季度、第四季度

10% 线程 - 第四季度、第一季度、第二季度、第三季度

当线程收到队列中新添加的通知或该线程执行的当前任务完成时,将由线程完成扫描。其他时候,线程将等待。但是,与队列上的阻塞相比,扫描的效率仍然较低。

方法 2 更适合我的用例。

是否有人尝试过这些方法中的任何一种或针对类似用例的不同方法?有什么想法/建议吗?

最佳答案

没有简单的方法可以更改 PriorityQueue 中已插入元素的优先级,已经讨论过 here

您的第二个选项应该易于实现,例如从高优先级队列处理比从低优先级队列处理更多的任务

您还可以考虑为每个优先级使用不同的线程池,每个池中的线程数量取决于任务的优先级。

关于java - 线程池执行器具有优先任务并避免饥饿,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38953331/

相关文章:

java - 将返回的变量传递给其他方法

c++ - 生成多个 std::thread 并重用它

python - 在 Python 中使用多线程计算阶乘

java - Future<?>? 的可运行实例

c# - 如何以线程安全的方式关闭表单(从后台线程使用)?

java - Collections2.filter 方法不满足传递的参数

java - (Arquillian REST 扩展)为什么 webtarget 在测试方法中为空?

java - 使用 jacob 库删除和更新 Outlook 联系人

java - 为什么响应式(Reactive)编程应用程序(vert.x)比单线程无锁、无阻塞 java 应用程序更快?

c++ - 如何仅在所有任务完成后才使用c++中的boost向线程池添加新任务