java - 处理文件和数据库插入的最快方法 - Java 多线程

标签 java multithreading h2

我对多线程编码完全陌生。

这是我的要求: 我有一个包含 50 000 - 300 000 条记录的文件。

它是基于列的数据(4 列),以空格作为分隔符。我需要使用空格分割行并将记录保存在 DB 中的 4 列中。

我想开发一个多线程应用程序,它将数据插入到具有 4 列的 H2 DB(使用 JDBC/其他什么?),大约需要 2 秒。 我需要根据收到的记录数动态更改线程池大小。

我正在使用 Java Swings 开发桌面应用程序。 (不是基于网络的应用程序)

我不知道是否有更好的并发类可以更快地完成此任务。

如果不是多线程,还有其他办法吗?或任何其他框架?

添加批处理后,大约需要 5 秒,处理 250,000 条记录:

    BufferedReader in = new BufferedReader(new FileReader(file));
    java.util.List<String[]> allLines = new ArrayList<String[]>(); // used for something else

    String sql = "insert into test (a, b, c, d)” +
            " values (?,?,?,?)";

    PreparedStatement pstmt = conn.prepareStatement(sql);
    int i=0;
    while ((line = in.readLine()) != null) {

        line = line.trim().replaceAll(" +", " ");
        String[] sp = line.split(" ");
        String msg = line.substring(line.indexOf(sp[5]));
        allLines.add(new String[]{sp[0] + " " + sp[1], sp[4], sp[5], msg});

        pstmt.setString(1, sp[0] + " " + sp[1]);
        pstmt.setString(2, sp[4]);
        pstmt.setString(3, sp[5]);
        pstmt.setString(4, msg);

        pstmt.addBatch();

        i++;

        if (i % 1000 == 0){
            pstmt.executeBatch();
            conn.commit();
        }
    }

    pstmt.executeBatch();

最佳答案

通过以下方式改进逻辑:

  • PreparedStatement的实例上创建并在每次插入时使用它
  • 使用批量仅发送大包插页

这可以通过以下方式完成:

private PreparedStatement pstmt;

public BatchInsertion(String sql) throws SQLException{
    pstmt = conn.prepareStatement(sql)
}

public int insert(String a, String b, String c, String d) throws SQLException{
    pstmt.setString(1, a);
    pstmt.setString(2, b);
    pstmt.setString(3, c);
    pstmt.setString(4, d);

    pstmt.addBatch();
    return batchSize++;
}

public void sendBatch() throws SQLException{
    pstmt.executeBatch();
}

在那里,您只需要管理该实例的插入,当您到达最后一个项目或说批处理中的 1000 个项目时,发送它。

我用它来不强制插入 Collection首先。

注意:最后需要关闭语句,我会实现AutoCloseable在这样的类上执行此操作,并且您需要尝试使用资源才能安全。

<小时/>

如果您需要多线程此插入。我建议采用以下架构:

创建一个线程池,每个线程池都会有一个连接和一批插入数据。 使用一个队列来插入从文件中推送数据。 每个线程都会获取一个值并将其添加到批处理中。

通过这种架构,您可以轻松增加线程数量。

一、轻量化BatchInsert类能够使其运行:

class BatchInsert implements AutoCloseable {

    private int batchSize = 0;
    private final int batchLimit;

    public BatchInsert(int batchLimit) {
        this.batchLimit = batchLimit;
    }

    public void insert(String a, String b, String c, String d) {
        if (++batchSize >= batchLimit) {
            sendBatch();
        }
    }

    public void sendBatch() {
        System.out.format("Send batch with %d records%n", batchSize);
        batchSize = 0;
    }

    @Override
    public void close() {
        if (batchSize != 0) {
            sendBatch();
        }
    }
}

然后,我使用某种平衡器来提供一个队列和一些 Thread共享同一个队列。

class BalanceBatch {
    private final List<RunnableBatch> threads = new ArrayList<>();

    private Queue<String> queue = new ConcurrentLinkedQueue<>();
    private static final int BATCH_SIZE = 50_000;

    public BalanceBatch(int nbThread) {
        IntStream.range(0, nbThread).mapToObj(i -> new RunnableBatch(BATCH_SIZE, queue)).forEach(threads::add);
    }

    public void send(String value) {
        queue.add(value);
    }

    public void startAll() {
        for (RunnableBatch t : threads) {
            new Thread(t).start();
        }
    }

    public void stopAll() {
        for (RunnableBatch t : threads) {
            t.stop();
        }
    }
}

然后我实现读取这些可运行实例的队列的逻辑。他们的想法是读取队列并将其发送到批处理,直到队列为空并收到命令“STOP”。

class RunnableBatch implements Runnable {

    private boolean started = true;
    private Queue<String> queue;
    private int batchLimit;

    public RunnableBatch(int batchLimit, Queue<String> queue) {
        this.batchLimit = batchLimit;
        this.queue = queue;
    }

    @Override
    public void run() {
        try (BatchInsert batch = new BatchInsert(batchLimit)) {
            while (!queue.isEmpty() || started) {
                String s = queue.poll();
                if (s == null) {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {

                    }
                } else {
                    String[] values = s.split(";");
                    batch.insert(values[0], values[1], values[2], values[3]);
                }
            }
        }
    }

    public void stop() {
        started = false;
    }
}

我通过基本测试尝试了这个

public static void main(String[] args) throws IOException {
    createDummy("/tmp/data.txt", 25_000_000);

    BalanceBatch balance = new BalanceBatch(10);

    balance.startAll();
    try (Stream<String> stream = Files.lines(Paths.get("/tmp/data.txt"))) {
        stream.forEach(balance::send);
    } catch (Exception e1) {
        e1.printStackTrace();
    }
    balance.stopAll();
}

public static void createDummy(String file, int nbLine) throws IOException {
    Files.write(Paths.get(file), (Iterable<String>) IntStream.range(0, nbLine).mapToObj(i -> String.format("A%d;B%d;C%d;D%d", i, i, i, i))::iterator);
}

这将打印发送的每一批,并显示最后一批将是相当随机的,因为余额不是“恒定的”。示例:10 个线程,每批 50k 条记录:

Send batch with 50000 records
...
Send batch with 50000 records
Send batch with 15830 records
Send batch with 15844 records
Send batch with 2354 records
Send batch with 14654 records
Send batch with 40181 records
Send batch with 44994 records
Send batch with 38376 records
Send batch with 17187 records
Send batch with 27047 records
Send batch with 33533 records

注释:

警告:createDummy函数将创建一个包含 25_000_000 行的文件(我已对其进行了评论)。这大约是一个 1GB 数据的文件

我需要更多时间来做一些基准测试,目前我没有任何用于大规模插入的数据库。

<小时/>

混合使用这个多线程文件读取器和批处理应该会给你带来好的结果。
请注意,这可能不是多线程的最佳实现,我从来没有研究过这个主题。我愿意接受建议/改进。

关于java - 处理文件和数据库插入的最快方法 - Java 多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49811474/

相关文章:

java - activemq-all "5.15.3"不适用于 Spring 5

java反编译

ruby-on-rails - ActionMailer::Base.default_url_options线程安全吗?

java - 在Java中使用ThreadPoolExecutor时如何通过超时取消特定任务?

database - 在使用 Grails 作为框架时尝试查看 H2 数据库中的表

java - 使用 H2 插入外键时出现问题

PostgreSQL 到 H2 引擎 : Data source setup issues

java - float 范围怎么这么大,有 4 个字节 (±3.40282347E+38)

java - 如何处理文件/文件夹路径中的不同语言

multithreading - 具有定期唤醒功能的事件监听器的高效 C++11 设计?