java - 更改ThreadPoolExecutor

标签 java multithreading java.util.concurrent threadpoolexecutor

如下面的链接所述:-

How to get the ThreadPoolExecutor to increase threads to max before queueing?

我将队列实现更改为在输入元素后返回 false。 因此,每当将新任务插入队列时,都会为其创建一个新线程。

但是当我使用记录器大规模运行以下实现(Bis 系统测试)时,会产生一个新问题。

当一个任务需要执行时,它会被插入到队列中,当队列返回 false 时,就会创建一个新线程来执行它。池中当前的空闲线程不会被拾取。原因是任务通过 getTask() 方法分配给空闲线程,该方法从队列中选取任务。所以我的问题是如何改变这种行为,以便如果线程空闲,如何确保为空闲线程分配执行任务而不是创建新线程? ??

下面的输出会让它更清楚:-

Task 46 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 47 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 48 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Active Count: 1 Pool Size : 4 Idle Count: 3 Queue Size: 0
Task 49 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
Task 50 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0

代码文件如下:-

ThreadPoolExecutor 的版本是 java 1.5,因为我们在服务器计算机上使用 1.5,无法升级它。

ThreadPoolExecutor:-

 public void execute(Runnable command) {
    System.out.println("Active Count: " + getActiveCount()
            + " Pool Size : " + getPoolSize() + " Idle Count: "
            + (getPoolSize() - getActiveCount())+" Queue Size: "+getQueue().size());
          if (command == null)
              throw new NullPointerException();
          for (;;) {
              if (runState != RUNNING) {
                  reject(command);
                  return;
              }
              if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
                  return;
              if (workQueue.offer(command))
                  return;
              int status = addIfUnderMaximumPoolSize(command);
              if (status > 0)      // created new thread
                  return;
              if (status == 0) {   // failed to create thread
                  reject(command);
                  return;
              }
              // Retry if created a new thread but it is busy with another task
          }
      }

LinkedBlockingQueue:-

public class CustomBlockingQueue<E> extends LinkedBlockingQueue<E> 
{

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    public CustomBlockingQueue() {
    super(Integer.MAX_VALUE);
    }

    public boolean offer(E e) {
       return false;
    }

}

在拒绝处理程序中,我们正在调用我们尚未重写的队列的 put 方法

调用执行器

   final CustomThreadPoolExecutor tpe = new CustomThreadPoolExecutor(3, 8, 0L, TimeUnit.MILLISECONDS, new MediationBlockingQueue<Runnable>(), new MediationRejectionHandler());

private static final int TASK_COUNT = 100;

 for (int i = 0; i < TASK_COUNT; i++) {

......
tpe.execute(new Task(i));
.....
}

我们调用核心池大小为 3 的执行器,最大池大小为 8,并使用无界链接阻塞队列来执行任务。

最佳答案

使用 SynchronousQueue 实现“在排队之前启动但更喜欢现有线程”行为的最简单方法。当且仅当已经有等待的接收者时,它才会接受提供的元素。因此空闲线程将获取项目,一旦没有空闲线程,ThreadPoolExecutor 将启动新线程。

唯一的缺点是,一旦所有线程启动,您就不能简单地将挂起的项目放入队列中,因为它没有容量。因此,您要么必须接受提交者被阻止,要么需要另一个队列来将挂起的任务放入其中,并需要另一个后台线程来尝试将这些挂起的项目放入同步队列。这个额外的线程不会损害性能,因为它大部分时间都被阻塞在这两个队列中的任何一个中。

class QueuingRejectionHandler implements RejectedExecutionHandler {

  final ExecutorService processPending=Executors.newSingleThreadExecutor();

  public void rejectedExecution(
      final Runnable r, final ThreadPoolExecutor executor) {
    processPending.execute(new Runnable() {
      public void run() {
        executor.execute(r);
      }
    });
  }
}

ThreadPoolExecutor e=new ThreadPoolExecutor(
  corePoolSize, maximumPoolSize, keepAliveTime, unit,
  new SynchronousQueue<Runnable>(), new QueuingRejectionHandler());

关于java - 更改ThreadPoolExecutor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19677308/

相关文章:

c# - Java Tcp Server 和 .Net Tcp Client 发送和接收问题

c++ - glibmm 超时信号

concurrency - 高吞吐量写入 Java 8 并发中的变量?

java - Firebase removeValue() 函数未删除正确的值

java - for 循环语法有什么问题?

java - 如果操作数是较小的类型,JLS 在哪里指定加法的结果是 int?

c# - ConcurrentDictionary 陷阱 - 来自 GetOrAdd 和 AddOrUpdate 的委托(delegate)工厂是否同步?

c# - 处理器使用情况(强制完全使用)

java - 寻找 java.util.Set 的无界、基于队列的并发实现

java - ConcurrentHashMap 更新存在值线程安全