java - 在循环中重新创建ExecutorService以进行批处理

标签 java multithreading concurrency executorservice

示例:我需要处理数百万条记录,我想并行处理它们以加快处理速度。为此,我想使用 Executor Service 中的线程池。每个任务最多需要几秒钟。为了不在一个线程池中为每条记录创建数百万个线程(在我的情况下会导致内存问题),我决定批量处理记录。

我想为每个批处理使用一个新的线程池。我让 Executor Service 等待批处理任务完成,然后关闭 Executor Service 并创建一个新的服务来处理下一批任务。 我做了这样的事情:

/*...................*/
int count = 1;
ExecutorService executor = buildExecutor(CORE_THREADS, MAX_THREADS);
            while (/* there is a record */) {
                executor.execute(new ProcessRecordThread(record));
                count++;
                if (count % BATCH_SIZE == 0) {
                    executor.shutdown();
                    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
                    executor = buildExecutor(CORE_THREADS, MAX_THREADS);
                }
            }
 /*................*/

创建Executor Service的方法

private static ExecutorService buildExecutor(int corePoolSize, int maximumPoolSize) {
            return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L,
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue(),
                    Executors.defaultThreadFactory());
        }

我知道创建线程池会增加一些处理开销。在循环中创建执行程序服务被认为是不好的做法。我应该注意哪些权衡?

有什么方法可以仅使用一个线程池来实现这一行为吗?

最佳答案

构建类有一个静态通用数据源,例如单例模式中的C3P0(开源)来管理连接。

配置c3p0.maxPoolSize = x并运行x个线程从dataSource获取连接,然后执行sql。

示例:

public class C3P0Pool {
    private static ComboPooledDataSource CPDS = new ComboPooledDataSource("c3p0");
    public static Connection getConnection() throws SQLException {
        return CPDS.getConnection();
    }
    public static DataSource getDataSource() {
        return CPDS;  // QueryRunner could constructed with DataSource
    }
}

您不必关闭数据源,执行connection.close()时连接会返回到数据源。

关于java - 在循环中重新创建ExecutorService以进行批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60464513/

相关文章:

java - Wiremock 和 XPath 属性测试

java - 奇怪的 HashSet contains() 行为

java - 包含集合的 Java 类上的哈希码实现

java - Java中如何在打开新线程之前关闭线程?

Java 惰性线程安全单例,使用 Final 字段实现

java - 如果我(需要)将 wait() 和 notify() 调用包含在同步块(synchronized block)中,那么如何执行 notify()?

java - 在 for 语句中使用 if 语句

java - 如何为每个请求项创建多个线程

c++ - MFC 将消息发送到主线程(而不是窗口)?

concurrency - 乐观与多版本并发控制 - 差异?