java - Spring batch - 如何使用 ThreadPoolTask​​Executor 动态增加和减少线程

标签 java concurrency spring-batch

我是 Spring-Batch 的新手,所以我可能缺乏理解。我想了解如何在我的作业运行时结合使用 ThreadPoolTask​​Executor 和 ThreadPoolExecutor 来动态增加和减少 线程。我尝试将 ThreadPoolTask​​Executor 和 ThreadPoolExecutor 都子类化,这样我就可以访问 beforeExecute() 和 afterExecute() ,这将允许我在使用本网站上列出的方法减少 corepoolsize 时终止线程。

我似乎不理解的是,当我覆盖返回 ExecutorService 的 initializeExecutor() 方法时,它显然没有在父类 (ThreadPoolTask​​Executor) 中设置(私有(private)内部)threadPoolExecutor 变量。它设置私有(private)的ExecutorService执行器; (来自 ExecutorConfigurationSupport 类)

由于 threadPoolExecutor 不是 protected 成员,我无法访问它。如果没有设置,当我运行时,当我检查幕后问题时,显然最终会在 Spring Framework 中收到“ThreadPoolExecutor not initialized”错误。

公共(public)类 MyThreadPoolTask​​Executor 扩展 ThreadPoolTask​​Executor {

@Override
protected ExecutorService initializeExecutor(ThreadFactory tf, RejectedExecutionHandler reh)
{

    BlockingQueue <Runnable> queue = createQueue(Integer.MAX_VALUE);

    MyThreadPoolExecutor tp_executor = new MyThreadPoolExecutor( this.getCorePoolSize(), this.getMaxPoolSize(),     this.getKeepAliveSeconds(), TimeUnit.SECONDS, queue);

    // if you look at the parent class(ThreadPoolTaskExecutor) it performs this call next.
    //  this.threadPoolExecutor = executor;
    // that is a private member with no ability to set via any methods.

    return tp_executor;
}

公共(public)类 MyThreadPoolExecutor 扩展 ThreadPoolExecutor {

public MyThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTimeout, TimeUnit timeunit, BlockingQueue<Runnable> workQueue, ThreadFactory tf, RejectedExecutionHandler reh) 
{
    super(corePoolSize, maxPoolSize, keepAliveTimeout, timeunit, workQueue, tf, reh);
}

protected void beforeExecute (final Thread thread, final Runnable job) 
{   
  ...
}

有人可以解释一下我的方法中缺少什么吗?

最佳答案

我假设您想在一个作业步骤中使用一个线程数,而在另一个作业步骤中使用另一个线程数。实现这一点的简单方法是声明两个具有必要线程数的独立执行程序,零 corePoolSize(在不需要时不创建线程)和零 keepAliveSeconds(以当不再需要时不要保留线程)。然后只需在一个步骤中注入(inject)第一个执行程序,在另一个步骤中注入(inject)第二个执行程序。

@Configuration
public class Conf {

    @Bean
    public TaskExecutor executorA(@Value("${first.number.of.threads}") int numberOfThreads) {
        return executor(numberOfThreads);
    }

    @Bean
    public TaskExecutor executorB(@Value("${second.number.of.threads}") int numberOfThreads) {
        return executor(numberOfThreads);
    }

    private TaskExecutor executor(int numberOfThreads) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(0);
        executor.setMaxPoolSize(numberOfThreads);
        executor.setAllowCoreThreadTimeOut(true);
        executor.setKeepAliveSeconds(0);
        return executor;
    }
}

关于java - Spring batch - 如何使用 ThreadPoolTask​​Executor 动态增加和减少线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38925116/

相关文章:

java - 如何使用 RESTEasy 客户端将列表作为查询参数发送

Java正则表达式至少匹配斯堪的纳维亚字母之一

带有 fork 的 Java 并行进程

ios - performBlockAndWait 在 iOS 7 上具有私有(private)队列死锁父级的子上下文

java - Spring Batch 自定义 ItemReader 无法打开

spring-batch - 处理后如何将文件移动到存档和错误文件夹

java - Google 安全浏览 v4 API java

java - 释放 Java 崩溃时的锁

java - Spring 批处理 : different job launcher for different jobs

java - 使用 == 检查两个字符是否相同 (Java)