我是 Spring-Batch 的新手,所以我可能缺乏理解。我想了解如何在我的作业运行时结合使用 ThreadPoolTaskExecutor 和 ThreadPoolExecutor 来动态增加和减少 线程。我尝试将 ThreadPoolTaskExecutor 和 ThreadPoolExecutor 都子类化,这样我就可以访问 beforeExecute() 和 afterExecute() ,这将允许我在使用本网站上列出的方法减少 corepoolsize 时终止线程。
我似乎不理解的是,当我覆盖返回 ExecutorService 的 initializeExecutor() 方法时,它显然没有在父类 (ThreadPoolTaskExecutor) 中设置(私有(private)内部)threadPoolExecutor 变量。它设置私有(private)的ExecutorService执行器; (来自 ExecutorConfigurationSupport 类)
由于 threadPoolExecutor 不是 protected 成员,我无法访问它。如果没有设置,当我运行时,当我检查幕后问题时,显然最终会在 Spring Framework 中收到“ThreadPoolExecutor not initialized”错误。
公共(public)类 MyThreadPoolTaskExecutor 扩展 ThreadPoolTaskExecutor {
@Override
protected ExecutorService initializeExecutor(ThreadFactory tf, RejectedExecutionHandler reh)
{
BlockingQueue <Runnable> queue = createQueue(Integer.MAX_VALUE);
MyThreadPoolExecutor tp_executor = new MyThreadPoolExecutor( this.getCorePoolSize(), this.getMaxPoolSize(), this.getKeepAliveSeconds(), TimeUnit.SECONDS, queue);
// if you look at the parent class(ThreadPoolTaskExecutor) it performs this call next.
// this.threadPoolExecutor = executor;
// that is a private member with no ability to set via any methods.
return tp_executor;
}
公共(public)类 MyThreadPoolExecutor 扩展 ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTimeout, TimeUnit timeunit, BlockingQueue<Runnable> workQueue, ThreadFactory tf, RejectedExecutionHandler reh)
{
super(corePoolSize, maxPoolSize, keepAliveTimeout, timeunit, workQueue, tf, reh);
}
protected void beforeExecute (final Thread thread, final Runnable job)
{
...
}
有人可以解释一下我的方法中缺少什么吗?
最佳答案
我假设您想在一个作业步骤中使用一个线程数,而在另一个作业步骤中使用另一个线程数。实现这一点的简单方法是声明两个具有必要线程数的独立执行程序,零 corePoolSize
(在不需要时不创建线程)和零 keepAliveSeconds
(以当不再需要时不要保留线程)。然后只需在一个步骤中注入(inject)第一个执行程序,在另一个步骤中注入(inject)第二个执行程序。
@Configuration
public class Conf {
@Bean
public TaskExecutor executorA(@Value("${first.number.of.threads}") int numberOfThreads) {
return executor(numberOfThreads);
}
@Bean
public TaskExecutor executorB(@Value("${second.number.of.threads}") int numberOfThreads) {
return executor(numberOfThreads);
}
private TaskExecutor executor(int numberOfThreads) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(0);
executor.setMaxPoolSize(numberOfThreads);
executor.setAllowCoreThreadTimeOut(true);
executor.setKeepAliveSeconds(0);
return executor;
}
}
关于java - Spring batch - 如何使用 ThreadPoolTaskExecutor 动态增加和减少线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38925116/