Java - 平衡 ThreadPoolExecutor 公平地为并行请求提供线程

标签 java multithreading threadpoolexecutor

我开发了一段多线程代码。该代码在 Web 应用程序中调用,因此可能由多个线程(请求)并行调用。为了控制此代码将要创建的线程数量(通过多个并行请求调用),我使用静态共享 ThreadPoolExecutor (Executors.newFixedThreadPool(nbOfThreads))。所以我确信这段代码永远不会创建超过 nbOfThreads 线程。为了跟踪给定请求中涉及的任务并等待它们完成,我对每个请求使用 CompletionService。

现在,我希望在为请求提供池线程的方式上有一点“公平”(不确定这是一个好词)。 使用默认固定的ThreadPoolExecutor,等待队列是一个LinkedBlockingQueue。它根据任务到达顺序(先进先出)将任务分配给执行器。假设池核心大小为 100 个线程。第一个请求很大,涉及创建 150 个任务。因此它将填满池并将 50 个任务放入等待队列中。如果第二个小请求晚 1 秒到达,即使它只需要池中的 2 个线程,它也必须等待第一个大请求创建的所有 150 个任务都完成后才能被处理。

如何让池公平且均匀地为每个请求分配线程?如何使第 2 个请求的 2 个任务在第 1 个查询的所有 50 个等待任务之后不再等待?

我的想法是开发一个个人的 BlockingQueue 实现来提供给 ThreadPoolExecutor。该 BlockingQueue 将存储按创建它们的请求分类的等待任务(在支持 Map 中,键中包含请求的 id,LinkedBlockingQueue 中存储请求的任务的值)。然后,当 ThreadPoolExecutor 从队列中takepoll 一个新任务时,队列每次都会从不同的请求中给出一个任务……这是正确的方法吗?这个用例对我来说似乎很常见。我很惊讶我必须自己实现如此定制和乏味的东西。这就是为什么我认为我可能是错的,并且存在一个众所周知的最佳实践来做到这一点。

这是我所做的代码。它有效,但仍然想知道这是否是正确的方法。

public class TestThreadPoolExecutorWithTurningQueue {

    private final static Logger logger = LogManager.getLogger();

    private static ThreadPoolExecutor executorService;

    int nbRequest = 4;

    int nbThreadPerRequest = 8;

    int threadPoolSize = 5;

    private void init() {
        executorService = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 0L, TimeUnit.MILLISECONDS,
                new CategoryBlockingQueue<Runnable>()// my custom blocking queue storing waiting tasks per request
                //new LinkedBlockingQueue<Runnable>()
        );
    }

    @Test
    public void test() throws Exception {
        init();
        // Parallel requests arriving
        ExecutorService tomcat = Executors.newFixedThreadPool(nbRequest);
        for (int i = 0; i < nbRequest; i++) {
            Thread.sleep(10);
            final int finalI = i;
            tomcat.execute(new Runnable() {
                @Override
                public void run() {
                    request(finalI);
                }
            });
        }
        tomcat.shutdown();
        tomcat.awaitTermination(1, TimeUnit.DAYS);
    }

    // Code executed by each request
    // Code multi-threaded using a single shared ThreadPoolExecutor to keep the 
    // number of threads under control
    public void request(final int requestId) {
        final List<Future<Object>> futures = new ArrayList<>();
        CustomCompletionService<Object> completionService = new CustomCompletionService<>(executorService);
        for (int j = 0; j < nbThreadPerRequest; j++) {
            final int finalJ = j;
            futures.add(completionService.submit(new CategoryRunnable(requestId) {
                @Override
                public void run() {
                    logger.debug("thread " + finalJ + " of request " + requestId);
                    try {
                        // here should come the useful things to be done
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, null));
        }
        // Wait fot completion of all the tasks of the request
        // If a task threw an exception, cancel the other tasks of the request
        for (int j = 0; j < nbThreadPerRequest; j++) {
            try {
                completionService.take().get();
            } catch (Exception e) {
                // Cancel the remaining tasks
                for (Future<Object> future : futures) {
                    future.cancel(true);
                }

                // Get the underlying exception
                Exception toThrow = e;
                if (e instanceof ExecutionException) {
                    ExecutionException ex = (ExecutionException) e;
                    toThrow = (Exception) ex.getCause();
                }
                throw new RuntimeException(toThrow);
            }
        }
    }

    public class CustomCompletionService<V> implements CompletionService<V> {

        private final Executor executor;

        private final BlockingQueue<Future<V>> completionQueue;

        public CustomCompletionService(Executor executor) {
            if (executor == null)
                throw new NullPointerException();
            this.executor = executor;
            this.completionQueue = new LinkedBlockingQueue<Future<V>>();
        }

        private RunnableFuture<V> newTaskFor(Callable<V> task) {
            return new FutureTask<V>(task);
        }

        private RunnableFuture<V> newTaskFor(Runnable task, V result) {
            return new FutureTask<V>(task, result);
        }

        public Future<V> submit(CategoryCallable<V> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task);
            executor.execute(new CategorizedQueueingFuture(f, task.getCategory()));
            return f;
        }

        public Future<V> submit(CategoryRunnable task, V result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task, result);
            executor.execute(new CategorizedQueueingFuture(f, task.getCategory()));
            return f;
        }

        public Future<V> submit(CategoryRunnable task) {
            return submit(task, null);
        }

        @Override
        public Future<V> submit(Callable<V> task) {
            throw new IllegalArgumentException("Must use a 'CategoryCallable'");
        }

        @Override
        public Future<V> submit(Runnable task, V result) {
            throw new IllegalArgumentException("Must use a 'CategoryRunnable'");
        }

        public Future<V> take() throws InterruptedException {
            return completionQueue.take();
        }

        public Future<V> poll() {
            return completionQueue.poll();
        }

        public Future<V> poll(long timeout, TimeUnit unit)
                throws InterruptedException {
            return completionQueue.poll(timeout, unit);
        }

        /**
         * FutureTask extension to enqueue upon completion + Category
         */
        public class CategorizedQueueingFuture extends FutureTask<Void> {

            private final Future<V> task;

            private int category;

            CategorizedQueueingFuture(RunnableFuture<V> task, int category) {
                super(task, null);
                this.task = task;
                this.category = category;
            }

            protected void done() {
                completionQueue.add(task);
            }

            public int getCategory() {
                return category;
            }
        }
    }

    public abstract class CategoryRunnable implements Runnable {

        private int category;

        public CategoryRunnable(int category) {
            this.category = category;
        }

        public int getCategory() {
            return category;
        }
    }

    public abstract class CategoryCallable<V> implements Callable<V> {

        private int category;

        public CategoryCallable(int category) {
            this.category = category;
        }

        public int getCategory() {
            return category;
        }
    }

    public class CategoryBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

        private Map<Integer, LinkedBlockingQueue<E>> map = new HashMap<>();

        private AtomicInteger count = new AtomicInteger(0);

        private ReentrantLock lock = new ReentrantLock();

        private LinkedBlockingQueue<Integer> nextCategories = new LinkedBlockingQueue<>();

        @Override
        public boolean offer(E e) {
            CustomCompletionService.CategorizedQueueingFuture item = (CustomCompletionService.CategorizedQueueingFuture) e;
            lock.lock();
            try {
                int category = item.getCategory();
                if (!map.containsKey(category)) {
                    map.put(category, new LinkedBlockingQueue<E>());
                    nextCategories.offer(category);
                }
                boolean b = map.get(category).offer(e);
                if (b) {
                    count.incrementAndGet();
                }
                return b;
            } finally {
                lock.unlock();
            }
        }

        @Override
        public E poll() {
            return null;
        }

        @Override
        public E peek() {
            return null;
        }

        @Override
        public void put(E e) throws InterruptedException {

        }

        @Override
        public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
            return false;
        }

        @Override
        public E take() throws InterruptedException {
            lock.lockInterruptibly();
            try {
                Integer nextCategory = nextCategories.take();
                LinkedBlockingQueue<E> categoryElements = map.get(nextCategory);
                E e = categoryElements.take();
                count.decrementAndGet();
                if (categoryElements.isEmpty()) {
                    map.remove(nextCategory);
                } else {
                    nextCategories.offer(nextCategory);
                }
                return e;
            } finally {
                lock.unlock();
            }
        }

        @Override
        public boolean remove(Object o) {
            CustomCompletionService.CategorizedQueueingFuture item = (CustomCompletionService.CategorizedQueueingFuture) o;
            lock.lock();
            try {
                int category = item.getCategory();
                LinkedBlockingQueue<E> categoryElements = map.get(category);
                boolean b = categoryElements.remove(item);
                if (categoryElements.isEmpty()) {
                    map.remove(category);
                }
                if (b) {
                    count.decrementAndGet();
                }
                return b;
            } finally {
                lock.unlock();
            }
        }

        @Override
        public int drainTo(Collection<? super E> c) {
            return 0;
        }

        @Override
        public int drainTo(Collection<? super E> c, int maxElements) {
            return 0;
        }

        @Override
        public Iterator<E> iterator() {
            return null;
        }

        @Override
        public int size() {
            return count.get();
        }

        @Override
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            // TODO
            return null;
        }

        @Override
        public int remainingCapacity() {
            return 0;
        }

    }
}

与传统LinkedBlockingQueue的输出

2017-01-09 14:56:13,061 [pool-2-thread-1] DEBUG - thread 0 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-4] DEBUG - thread 3 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-2] DEBUG - thread 1 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-3] DEBUG - thread 2 of request 0
2017-01-09 14:56:13,061 [pool-2-thread-5] DEBUG - thread 4 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-2] DEBUG - thread 5 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-1] DEBUG - thread 6 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-4] DEBUG - thread 7 of request 0
2017-01-09 14:56:15,063 [pool-2-thread-3] DEBUG - thread 0 of request 1
2017-01-09 14:56:15,063 [pool-2-thread-5] DEBUG - thread 1 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-2] DEBUG - thread 2 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-4] DEBUG - thread 3 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-1] DEBUG - thread 5 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-3] DEBUG - thread 4 of request 1
2017-01-09 14:56:17,064 [pool-2-thread-5] DEBUG - thread 6 of request 1
2017-01-09 14:56:19,064 [pool-2-thread-4] DEBUG - thread 7 of request 1
2017-01-09 14:56:19,064 [pool-2-thread-1] DEBUG - thread 0 of request 2
2017-01-09 14:56:19,064 [pool-2-thread-3] DEBUG - thread 1 of request 2
2017-01-09 14:56:19,064 [pool-2-thread-5] DEBUG - thread 2 of request 2
2017-01-09 14:56:19,064 [pool-2-thread-2] DEBUG - thread 3 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-4] DEBUG - thread 4 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-3] DEBUG - thread 5 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-5] DEBUG - thread 6 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-2] DEBUG - thread 7 of request 2
2017-01-09 14:56:21,064 [pool-2-thread-1] DEBUG - thread 0 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-4] DEBUG - thread 2 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-3] DEBUG - thread 1 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-2] DEBUG - thread 3 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-1] DEBUG - thread 4 of request 3
2017-01-09 14:56:23,064 [pool-2-thread-5] DEBUG - thread 5 of request 3
2017-01-09 14:56:25,064 [pool-2-thread-2] DEBUG - thread 7 of request 3
2017-01-09 14:56:25,064 [pool-2-thread-1] DEBUG - thread 6 of request 3

使用我的自定义 CategoryBlockingQueue 进行输出

2017-01-09 14:54:54,765 [pool-2-thread-3] DEBUG - thread 2 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-2] DEBUG - thread 1 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-5] DEBUG - thread 4 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-1] DEBUG - thread 0 of request 0
2017-01-09 14:54:54,765 [pool-2-thread-4] DEBUG - thread 3 of request 0
2017-01-09 14:54:56,767 [pool-2-thread-1] DEBUG - thread 0 of request 1
2017-01-09 14:54:56,767 [pool-2-thread-4] DEBUG - thread 0 of request 3
2017-01-09 14:54:56,767 [pool-2-thread-3] DEBUG - thread 5 of request 0
2017-01-09 14:54:56,767 [pool-2-thread-5] DEBUG - thread 0 of request 2
2017-01-09 14:54:56,767 [pool-2-thread-2] DEBUG - thread 6 of request 0
2017-01-09 14:54:58,767 [pool-2-thread-1] DEBUG - thread 1 of request 1
2017-01-09 14:54:58,767 [pool-2-thread-5] DEBUG - thread 1 of request 2
2017-01-09 14:54:58,767 [pool-2-thread-2] DEBUG - thread 7 of request 0
2017-01-09 14:54:58,767 [pool-2-thread-4] DEBUG - thread 1 of request 3
2017-01-09 14:54:58,767 [pool-2-thread-3] DEBUG - thread 2 of request 1
2017-01-09 14:55:00,767 [pool-2-thread-1] DEBUG - thread 2 of request 2
2017-01-09 14:55:00,767 [pool-2-thread-5] DEBUG - thread 2 of request 3
2017-01-09 14:55:00,767 [pool-2-thread-2] DEBUG - thread 3 of request 1
2017-01-09 14:55:00,767 [pool-2-thread-4] DEBUG - thread 3 of request 2
2017-01-09 14:55:00,767 [pool-2-thread-3] DEBUG - thread 3 of request 3
2017-01-09 14:55:02,767 [pool-2-thread-5] DEBUG - thread 4 of request 1
2017-01-09 14:55:02,767 [pool-2-thread-3] DEBUG - thread 4 of request 2
2017-01-09 14:55:02,767 [pool-2-thread-2] DEBUG - thread 4 of request 3
2017-01-09 14:55:02,767 [pool-2-thread-1] DEBUG - thread 5 of request 1
2017-01-09 14:55:02,767 [pool-2-thread-4] DEBUG - thread 5 of request 2
2017-01-09 14:55:04,767 [pool-2-thread-2] DEBUG - thread 5 of request 3
2017-01-09 14:55:04,767 [pool-2-thread-1] DEBUG - thread 6 of request 1
2017-01-09 14:55:04,767 [pool-2-thread-5] DEBUG - thread 6 of request 2
2017-01-09 14:55:04,767 [pool-2-thread-3] DEBUG - thread 6 of request 3
2017-01-09 14:55:04,768 [pool-2-thread-4] DEBUG - thread 7 of request 1
2017-01-09 14:55:06,768 [pool-2-thread-2] DEBUG - thread 7 of request 3
2017-01-09 14:55:06,768 [pool-2-thread-1] DEBUG - thread 7 of request 2

最佳答案

我已经抛出了下面的链接,它可能对您自己的公平锁定实现有用。

http://tutorials.jenkov.com/java-concurrency/starvation-and-fairness.html

关于Java - 平衡 ThreadPoolExecutor 公平地为并行请求提供线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41546452/

相关文章:

Java CachedThreadPool 与 FixedThreadPool

java - ThreadPoolExecutor 的运行永远不会超过池大小

java - 启动电子邮件 Intent 时出现 ClassCastException

java - 在 Java 中,一个类可以引用包含它的另一个类的实例吗?

java - 使用Javascript在webview(Android)中填写表单而无需输入类型的元素ID - 文本和密码?

multithreading - 单体(对比)微服务 ==> 线程(对比)进程

java - AsyncTask执行错误

c# - 在多线程环境中从 C# 调用 Delphi DLL 时发生访问冲突

android - 我应该使用什么?服务 ?异步任务?还有别的吗?

Java线程队列