我开发了一段多线程代码。该代码在 Web 应用程序中调用,因此可能由多个线程(请求)并行调用。为了控制此代码将要创建的线程数量(通过多个并行请求调用),我使用静态共享 ThreadPoolExecutor (Executors.newFixedThreadPool(nbOfThreads)
)。所以我确信这段代码永远不会创建超过 nbOfThreads 线程。为了跟踪给定请求中涉及的任务并等待它们完成,我对每个请求使用 CompletionService。
现在,我希望在为请求提供池线程的方式上有一点“公平”(不确定这是一个好词)。 使用默认固定的ThreadPoolExecutor,等待队列是一个LinkedBlockingQueue。它根据任务到达顺序(先进先出)将任务分配给执行器。假设池核心大小为 100 个线程。第一个请求很大,涉及创建 150 个任务。因此它将填满池并将 50 个任务放入等待队列中。如果第二个小请求晚 1 秒到达,即使它只需要池中的 2 个线程,它也必须等待第一个大请求创建的所有 150 个任务都完成后才能被处理。
如何让池公平且均匀地为每个请求分配线程?如何使第 2 个请求的 2 个任务在第 1 个查询的所有 50 个等待任务之后不再等待?
我的想法是开发一个个人的 BlockingQueue 实现来提供给 ThreadPoolExecutor。该 BlockingQueue 将存储按创建它们的请求分类的等待任务(在支持 Map 中,键中包含请求的 id,LinkedBlockingQueue 中存储请求的任务的值)。然后,当 ThreadPoolExecutor 从队列中take
或 poll
一个新任务时,队列每次都会从不同的请求中给出一个任务……这是正确的方法吗?这个用例对我来说似乎很常见。我很惊讶我必须自己实现如此定制和乏味的东西。这就是为什么我认为我可能是错的,并且存在一个众所周知的最佳实践来做到这一点。
这是我所做的代码。它有效,但仍然想知道这是否是正确的方法。
public class TestThreadPoolExecutorWithTurningQueue {
private final static Logger logger = LogManager.getLogger();
private static ThreadPoolExecutor executorService;
int nbRequest = 4;
int nbThreadPerRequest = 8;
int threadPoolSize = 5;
private void init() {
executorService = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 0L, TimeUnit.MILLISECONDS,
new CategoryBlockingQueue<Runnable>()// my custom blocking queue storing waiting tasks per request
//new LinkedBlockingQueue<Runnable>()
);
}
@Test
public void test() throws Exception {
init();
// Parallel requests arriving
ExecutorService tomcat = Executors.newFixedThreadPool(nbRequest);
for (int i = 0; i < nbRequest; i++) {
Thread.sleep(10);
final int finalI = i;
tomcat.execute(new Runnable() {
@Override
public void run() {
request(finalI);
}
});
}
tomcat.shutdown();
tomcat.awaitTermination(1, TimeUnit.DAYS);
}
// Code executed by each request
// Code multi-threaded using a single shared ThreadPoolExecutor to keep the
// number of threads under control
public void request(final int requestId) {
final List<Future<Object>> futures = new ArrayList<>();
CustomCompletionService<Object> completionService = new CustomCompletionService<>(executorService);
for (int j = 0; j < nbThreadPerRequest; j++) {
final int finalJ = j;
futures.add(completionService.submit(new CategoryRunnable(requestId) {
@Override
public void run() {
logger.debug("thread " + finalJ + " of request " + requestId);
try {
// here should come the useful things to be done
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, null));
}
// Wait fot completion of all the tasks of the request
// If a task threw an exception, cancel the other tasks of the request
for (int j = 0; j < nbThreadPerRequest; j++) {
try {
completionService.take().get();
} catch (Exception e) {
// Cancel the remaining tasks
for (Future<Object> future : futures) {
future.cancel(true);
}
// Get the underlying exception
Exception toThrow = e;
if (e instanceof ExecutionException) {
ExecutionException ex = (ExecutionException) e;
toThrow = (Exception) ex.getCause();
}
throw new RuntimeException(toThrow);
}
}
}
public class CustomCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final BlockingQueue<Future<V>> completionQueue;
public CustomCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
return new FutureTask<V>(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
return new FutureTask<V>(task, result);
}
public Future<V> submit(CategoryCallable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new CategorizedQueueingFuture(f, task.getCategory()));
return f;
}
public Future<V> submit(CategoryRunnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new CategorizedQueueingFuture(f, task.getCategory()));
return f;
}
public Future<V> submit(CategoryRunnable task) {
return submit(task, null);
}
@Override
public Future<V> submit(Callable<V> task) {
throw new IllegalArgumentException("Must use a 'CategoryCallable'");
}
@Override
public Future<V> submit(Runnable task, V result) {
throw new IllegalArgumentException("Must use a 'CategoryRunnable'");
}
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
/**
* FutureTask extension to enqueue upon completion + Category
*/
public class CategorizedQueueingFuture extends FutureTask<Void> {
private final Future<V> task;
private int category;
CategorizedQueueingFuture(RunnableFuture<V> task, int category) {
super(task, null);
this.task = task;
this.category = category;
}
protected void done() {
completionQueue.add(task);
}
public int getCategory() {
return category;
}
}
}
public abstract class CategoryRunnable implements Runnable {
private int category;
public CategoryRunnable(int category) {
this.category = category;
}
public int getCategory() {
return category;
}
}
public abstract class CategoryCallable<V> implements Callable<V> {
private int category;
public CategoryCallable(int category) {
this.category = category;
}
public int getCategory() {
return category;
}
}
public class CategoryBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
private Map<Integer, LinkedBlockingQueue<E>> map = new HashMap<>();
private AtomicInteger count = new AtomicInteger(0);
private ReentrantLock lock = new ReentrantLock();
private LinkedBlockingQueue<Integer> nextCategories = new LinkedBlockingQueue<>();
@Override
public boolean offer(E e) {
CustomCompletionService.CategorizedQueueingFuture item = (CustomCompletionService.CategorizedQueueingFuture) e;
lock.lock();
try {
int category = item.getCategory();
if (!map.containsKey(category)) {
map.put(category, new LinkedBlockingQueue<E>());
nextCategories.offer(category);
}
boolean b = map.get(category).offer(e);
if (b) {
count.incrementAndGet();
}
return b;
} finally {
lock.unlock();
}
}
@Override
public E poll() {
return null;
}
@Override
public E peek() {
return null;
}
@Override
public void put(E e) throws InterruptedException {
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
Integer nextCategory = nextCategories.take();
LinkedBlockingQueue<E> categoryElements = map.get(nextCategory);
E e = categoryElements.take();
count.decrementAndGet();
if (categoryElements.isEmpty()) {
map.remove(nextCategory);
} else {
nextCategories.offer(nextCategory);
}
return e;
} finally {
lock.unlock();
}
}
@Override
public boolean remove(Object o) {
CustomCompletionService.CategorizedQueueingFuture item = (CustomCompletionService.CategorizedQueueingFuture) o;
lock.lock();
try {
int category = item.getCategory();
LinkedBlockingQueue<E> categoryElements = map.get(category);
boolean b = categoryElements.remove(item);
if (categoryElements.isEmpty()) {
map.remove(category);
}
if (b) {
count.decrementAndGet();
}
return b;
} finally {
lock.unlock();
}
}
@Override
public int drainTo(Collection<? super E> c) {
return 0;
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return 0;
}
@Override
public Iterator<E> iterator() {
return null;
}
@Override
public int size() {
return count.get();
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// TODO
return null;
}
@Override
public int remainingCapacity() {
return 0;
}
}
}
与传统LinkedBlockingQueue的输出
2017-01-09 14:56:13,061 [pool-2-thread-1] DEBUG - thread 0 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-4] DEBUG - thread 3 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-2] DEBUG - thread 1 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-3] DEBUG - thread 2 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-5] DEBUG - thread 4 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-2] DEBUG - thread 5 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-1] DEBUG - thread 6 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-4] DEBUG - thread 7 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-3] DEBUG - thread 0 of request 1
2017-01-09 14:56:15,063 [pool-2-thread-5] DEBUG - thread 1 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-2] DEBUG - thread 2 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-4] DEBUG - thread 3 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-1] DEBUG - thread 5 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-3] DEBUG - thread 4 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-5] DEBUG - thread 6 of request 1
2017-01-09 14:56:19,064 [pool-2-thread-4] DEBUG - thread 7 of request 1
2017-01-09 14:56:19,064 [pool-2-thread-1] DEBUG - thread 0 of request 2
2017-01-09 14:56:19,064 [pool-2-thread-3] DEBUG - thread 1 of request 2
2017-01-09 14:56:19,064 [pool-2-thread-5] DEBUG - thread 2 of request 2
2017-01-09 14:56:19,064 [pool-2-thread-2] DEBUG - thread 3 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-4] DEBUG - thread 4 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-3] DEBUG - thread 5 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-5] DEBUG - thread 6 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-2] DEBUG - thread 7 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-1] DEBUG - thread 0 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-4] DEBUG - thread 2 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-3] DEBUG - thread 1 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-2] DEBUG - thread 3 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-1] DEBUG - thread 4 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-5] DEBUG - thread 5 of request 3
2017-01-09 14:56:25,064 [pool-2-thread-2] DEBUG - thread 7 of request 3
2017-01-09 14:56:25,064 [pool-2-thread-1] DEBUG - thread 6 of request 3
使用我的自定义 CategoryBlockingQueue 进行输出
2017-01-09 14:54:54,765 [pool-2-thread-3] DEBUG - thread 2 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-2] DEBUG - thread 1 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-5] DEBUG - thread 4 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-1] DEBUG - thread 0 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-4] DEBUG - thread 3 of request 0
2017-01-09 14:54:56,767 [pool-2-thread-1] DEBUG - thread 0 of request 1
2017-01-09 14:54:56,767 [pool-2-thread-4] DEBUG - thread 0 of request 3
2017-01-09 14:54:56,767 [pool-2-thread-3] DEBUG - thread 5 of request 0
2017-01-09 14:54:56,767 [pool-2-thread-5] DEBUG - thread 0 of request 2
2017-01-09 14:54:56,767 [pool-2-thread-2] DEBUG - thread 6 of request 0
2017-01-09 14:54:58,767 [pool-2-thread-1] DEBUG - thread 1 of request 1
2017-01-09 14:54:58,767 [pool-2-thread-5] DEBUG - thread 1 of request 2
2017-01-09 14:54:58,767 [pool-2-thread-2] DEBUG - thread 7 of request 0
2017-01-09 14:54:58,767 [pool-2-thread-4] DEBUG - thread 1 of request 3
2017-01-09 14:54:58,767 [pool-2-thread-3] DEBUG - thread 2 of request 1
2017-01-09 14:55:00,767 [pool-2-thread-1] DEBUG - thread 2 of request 2
2017-01-09 14:55:00,767 [pool-2-thread-5] DEBUG - thread 2 of request 3
2017-01-09 14:55:00,767 [pool-2-thread-2] DEBUG - thread 3 of request 1
2017-01-09 14:55:00,767 [pool-2-thread-4] DEBUG - thread 3 of request 2
2017-01-09 14:55:00,767 [pool-2-thread-3] DEBUG - thread 3 of request 3
2017-01-09 14:55:02,767 [pool-2-thread-5] DEBUG - thread 4 of request 1
2017-01-09 14:55:02,767 [pool-2-thread-3] DEBUG - thread 4 of request 2
2017-01-09 14:55:02,767 [pool-2-thread-2] DEBUG - thread 4 of request 3
2017-01-09 14:55:02,767 [pool-2-thread-1] DEBUG - thread 5 of request 1
2017-01-09 14:55:02,767 [pool-2-thread-4] DEBUG - thread 5 of request 2
2017-01-09 14:55:04,767 [pool-2-thread-2] DEBUG - thread 5 of request 3
2017-01-09 14:55:04,767 [pool-2-thread-1] DEBUG - thread 6 of request 1
2017-01-09 14:55:04,767 [pool-2-thread-5] DEBUG - thread 6 of request 2
2017-01-09 14:55:04,767 [pool-2-thread-3] DEBUG - thread 6 of request 3
2017-01-09 14:55:04,768 [pool-2-thread-4] DEBUG - thread 7 of request 1
2017-01-09 14:55:06,768 [pool-2-thread-2] DEBUG - thread 7 of request 3
2017-01-09 14:55:06,768 [pool-2-thread-1] DEBUG - thread 7 of request 2
最佳答案
我已经抛出了下面的链接,它可能对您自己的公平锁定实现有用。
http://tutorials.jenkov.com/java-concurrency/starvation-and-fairness.html
关于Java - 平衡 ThreadPoolExecutor 公平地为并行请求提供线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41546452/