我有一个应用程序,需要将数据提取到 ES 中。由于有多行,因此我使用 ThreadPoolTaskExecutor 并行化 50 批数据的摄取。另外,我正在消费者中获取这些数据,并且在处理完所有数据之前我不想确认记录,因此我将任务封装在 Runnable 类中,并将它们添加到可调用列表中,并在列表包含所有数据时使用 invokeAll()。
我的ThreadPoolTaskExecutor
的 maxPoolSize 为 200
,corePoolSize
为 50
。因此,我期望应用程序一次使用 200
个线程,因为根据我的理解,当排队的任务完成时,ThreadPoolTaskExecutor
使用最多 corePoolSize
的线程。小于 corePoolSize
,稍后它开始将线程添加到 maxPoolSize
并执行任务。当我检查日志和 VisualVm 时,我发现只有 50
线程正在执行,因此,我将 corePoolSize
增加到 70
并发现发现 70
线程正在执行。由于列表超过 300 万,队列大小应超过 3000。该问题的示例代码。
public void Test() throws InterruptedException {
List<Callable<Object>> tagList = new ArrayList<Callable<Object>>(1000);
for(int i=0; i<400; i++){
tagList.add(Executors.callable(new Runnable() {
@Override
public void run() {
System.out.println("Thread "+ Thread.currentThread());
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}));
}
ThreadPoolTaskExecutor.getThreadPoolExecutor().invokeAll(tagList);
}
信息 | 2021-02-14 00:06:32.086+05:30 |应用 | 134 | 134当前线程数:50 总线程数:200
信息 | 2021-02-14 00:06:33.085+05:30 |应用 | 134 | 134当前线程数:50 总线程数:200
如果我遗漏了什么,请有人解释一下。
最佳答案
maxPoolSize of my ThreadPoolTaskExecutor is 200 and corePoolSize is 50.So,I was expecting the application to use 200 threads at one time as by my understanding ThreadPoolTaskExecutor uses threads upto to corePoolSize when queued up tasks are less than the corePoolSize and later it starts adding threads upto the maxPoolSize and executes the tasks.
这不太正确。就像 ThreadPoolExecutor 的情况一样:
When a new task is submitted (..), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.
因此,即使 maxPoolSize 为 200
,您也只运行 50 个线程。
来自ThreadPoolTaskExecutor人们可以阅读:
The default configuration is a core pool size of 1, with unlimited max pool size and unlimited queue capacity. This is roughly equivalent to Executors.newSingleThreadExecutor(), sharing a single thread for all tasks. Setting "queueCapacity" to 0 mimics Executors.newCachedThreadPool(), with immediate scaling of threads in the pool to a potentially very high number. Consider also setting a "maxPoolSize" at that point, as well as possibly a higher "corePoolSize" (see also the "allowCoreThreadTimeOut" mode of scaling).
看来您必须将 queueCapacity
设置为零才能满足您的要求。
关于java - 使用 corePoolSize 而不是 maxPoolSize 的 InvokeAll 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66188737/