如下面的链接所述:-
How to get the ThreadPoolExecutor to increase threads to max before queueing?
我将队列实现更改为在输入元素后返回 false。 因此,每当将新任务插入队列时,都会为其创建一个新线程。
但是当我使用记录器大规模运行以下实现(Bis 系统测试)时,会产生一个新问题。
当一个任务需要执行时,它会被插入到队列中,当队列返回 false 时,就会创建一个新线程来执行它。池中当前的空闲线程不会被拾取。原因是任务通过 getTask()
方法分配给空闲线程,该方法从队列中选取任务。所以我的问题是如何改变这种行为,以便如果线程空闲,如何确保为空闲线程分配执行任务而不是创建新线程? ??
下面的输出会让它更清楚:-
Task 46 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 47 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 48 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Active Count: 1 Pool Size : 4 Idle Count: 3 Queue Size: 0
Task 49 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
Task 50 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
代码文件如下:-
ThreadPoolExecutor 的版本是 java 1.5,因为我们在服务器计算机上使用 1.5,无法升级它。
ThreadPoolExecutor:-
public void execute(Runnable command) {
System.out.println("Active Count: " + getActiveCount()
+ " Pool Size : " + getPoolSize() + " Idle Count: "
+ (getPoolSize() - getActiveCount())+" Queue Size: "+getQueue().size());
if (command == null)
throw new NullPointerException();
for (;;) {
if (runState != RUNNING) {
reject(command);
return;
}
if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
return;
if (workQueue.offer(command))
return;
int status = addIfUnderMaximumPoolSize(command);
if (status > 0) // created new thread
return;
if (status == 0) { // failed to create thread
reject(command);
return;
}
// Retry if created a new thread but it is busy with another task
}
}
LinkedBlockingQueue:-
public class CustomBlockingQueue<E> extends LinkedBlockingQueue<E>
{
/**
*
*/
private static final long serialVersionUID = 1L;
public CustomBlockingQueue() {
super(Integer.MAX_VALUE);
}
public boolean offer(E e) {
return false;
}
}
在拒绝处理程序中,我们正在调用我们尚未重写的队列的 put 方法
调用执行器
final CustomThreadPoolExecutor tpe = new CustomThreadPoolExecutor(3, 8, 0L, TimeUnit.MILLISECONDS, new MediationBlockingQueue<Runnable>(), new MediationRejectionHandler());
private static final int TASK_COUNT = 100;
for (int i = 0; i < TASK_COUNT; i++) {
......
tpe.execute(new Task(i));
.....
}
我们调用核心池大小为 3 的执行器,最大池大小为 8,并使用无界链接阻塞队列来执行任务。
最佳答案
使用 SynchronousQueue
实现“在排队之前启动但更喜欢现有线程”行为的最简单方法。当且仅当已经有等待的接收者时,它才会接受提供的元素。因此空闲线程将获取项目,一旦没有空闲线程,ThreadPoolExecutor 将启动新线程。
唯一的缺点是,一旦所有线程启动,您就不能简单地将挂起的项目放入队列中,因为它没有容量。因此,您要么必须接受提交者被阻止,要么需要另一个队列来将挂起的任务放入其中,并需要另一个后台线程来尝试将这些挂起的项目放入同步队列。这个额外的线程不会损害性能,因为它大部分时间都被阻塞在这两个队列中的任何一个中。
class QueuingRejectionHandler implements RejectedExecutionHandler {
final ExecutorService processPending=Executors.newSingleThreadExecutor();
public void rejectedExecution(
final Runnable r, final ThreadPoolExecutor executor) {
processPending.execute(new Runnable() {
public void run() {
executor.execute(r);
}
});
}
}
…
ThreadPoolExecutor e=new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit,
new SynchronousQueue<Runnable>(), new QueuingRejectionHandler());
关于java - 更改ThreadPoolExecutor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19677308/