java - 固定和动态调整工作线程的数量

标签 java multithreading threadpool executorservice worker

我目前正在实现一个数据容器/数据结构,它具有 addRecord(final Redcord record) 和 close() 方法。

public class Container{

    Container() {
    }

    public void addRecord(final Record record){
        // do something with the record
        // add the modified it to the container
    }

    public void close(){

    }     
}

由于为了将记录存储到容器中,需要完成几个步骤,因此我想在最终将它们添加到容器之前将这些步骤外包给线程。

我的问题是,我不希望任何容器实例使用超过 4 个线程,并且如果我同时打开多个容器实例,则总共不应使用超过 12 个线程。除此之外,我希望一旦创建了第 4 个容器,其他 3 个打开的容器中的每一个都应该丢失其中一个线程,这样所有 4 个容器都可以各使用 3 个线程。

我正在研究 ThreadPools 和 ExecutorServices。虽然设置大小为 12 的线程池很简单,但在使用 ExecutorServices 时很难将使用的线程数限制为 4。回想一下,我最多需要 12 个线程,这就是为什么我有一个在容器实例之间共享的大小为 12 的静态实例。

这就是我开始的

public class Container{

    public static final ExecutorService SERVICE= Executors.newFixedThreadPool(12);

    final List<Future<Entry>> _fRecords = new LinkedList<>();

    Container() {
    }

    public void addRecord(final Record record){
        _fRecords.add(SERVICE.submit(new Callable<Entry>{

            @Override
            public Entry call() throws Exception {
                return record.createEntry();
            }
        }));
    }

    public void close() throws Exception {
        for(final Future<Entry> f) {
            f.get(); // and add the entry to the container           
        }
    }     
}

显然,这不能确保单个实例最多使用 4 个线程。此外,如果另一个容器需要几个线程,我不知道如何强制这些 Executorservices 关闭工作线程或重新分配它们。

问题:

  1. 由于Redord本身是一个接口(interface),并且createEntry()的运行时间取决于实际的实现,所以我想确保不同的容器不使用相同的线程/工作线程,即我不希望任何线程都为两个不同的容器执行工作。这背后的想法是,我不希望仅处理“长时间运行”记录的容器实例被仅处理“短期运行记录”的容器减慢。如果这在概念上没有意义(对你来说),则不需要为不同的容器提供隔离的线程/工作线程。

  2. 实现我自己的 ExecutorService 是正确的方法吗?有人可以向我指出一个具有类似问题/解决方案的项目吗?否则我想我必须实现自己的 ExecutorService 并在我的容器之间共享它,或者有更好的解决方案吗?

  3. 目前,我正在 close 方法中收集所有 future,因为容器必须按照记录/条目到达的顺序存储它们。显然,这需要大量时间,因此我想在 Future 完成后立即执行此操作。让额外的工作人员单独处理 future 是否有意义,或者让我的工作人员完成这项工作(即进行线程交互)更好。

最佳答案

我不认为 Java 运行时提供的东西可以满足您开箱即用的需求。

我编写了自己的类来满足此类需求(公共(public)池+给定队列的限制)。

public static class ThreadLimitedQueue {
    private final ExecutorService executorService;
    private final int limit;

    private final Object lock = new Object();
    private final LinkedList<Runnable> pending = new LinkedList<>();
    private int inProgress = 0;

    public ThreadLimitedQueue(final ExecutorService executorService, final int limit) {
        this.executorService = executorService;
        this.limit = limit;
    }

    public void submit(Runnable runnable) {
        final Runnable wrapped = () -> {
            try {
                runnable.run();
            } finally {
                onComplete();
            }
        };
        synchronized (lock) {
            if (inProgress < limit) {
                inProgress++;
                executorService.submit(wrapped);
            } else {
                pending.add(wrapped);
            }
        }
    }

    private void onComplete() {
        synchronized (lock) {
            final Runnable pending = this.pending.poll();
            if (pending == null || inProgress > limit) {
                inProgress--;
            } else {
                executorService.submit(pending);
            }
        }
    }
}

我的情况唯一的区别limit是常量,但您可以修改它。例如,您可以替换 int limitSupplier<Integer> limitFunction并且该函数必须提供动态限制,例如

Supplier<Integer> limitFunction = 12 / containersList.size();

只是让它更健壮(例如,如果containersList为空或超过12该怎么办)

关于java - 固定和动态调整工作线程的数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55784995/

相关文章:

c# - 线程(任务)中的异常不会像示例建议的那样传播

java - 登录后在类之间使用 session 变量 - 空对象引用

java - 如何在 Java 中创建特定格式的 JSON 文件

ios - 如果我在 dispatch_async block 中有一个 dispatch_sync 调用,然后是第二个 dispatch 调用,那么第二个调用是同步还是异步有关系吗?

multithreading - Vulkan中的多线程渲染(生成命令缓冲区)比单线程慢

java - 处于等待状态的线程 - Apache Tomcat

java - 如何使用 Spring Security 通过基本身份验证来保护 swagger-ui

java - 如何在 ViewPager 的每个部分中使用 fragment

具有 const 函数的 C++ 线程安全

c# - 使用 COM 对象的后台 worker 仍然锁定 UI