java - 从 CSV 加载时的 PostgreSQL/JooQ 批量插入性能问题;我如何改进流程?

标签 java sql postgresql jooq

对于 this project ,我打算制作一个 web 版本,现在正在制作一个 PostgreSQL (9.x) 后端,webapp 将从中查询。

现在,跟踪器生成一个包含两个 CSV 的 zip 文件,在运行时将其加载到 H2 数据库中,其模式是这样的(是的,我知道 SQL 可以写得稍微更好):

create table matchers (
    id integer not null,
    class_name varchar(255) not null,
    matcher_type varchar(30) not null,
    name varchar(1024) not null
);

alter table matchers add primary key(id);

create table nodes (
    id integer not null,
    parent_id integer not null,
    level integer not null,
    success integer not null,
    matcher_id integer not null,
    start_index integer not null,
    end_index integer not null,
    time bigint not null
);

alter table nodes add primary key(id);
alter table nodes add foreign key (matcher_id) references matchers(id);
create index nodes_parent_id on nodes(parent_id);
create index nodes_indices on nodes(start_index, end_index);

现在,由于 PostgreSQL 数据库将能够处理多个跟踪,我不得不再添加一张表; PostgreSQL 后端的架构看起来像这样(也低于平均 SQL 警报;此外,在 parse_info 表中,content 列包含已解析文件的全文,在 zip 文件中单独存储):

create table parse_info (
    id uuid primary key,
    date timestamp not null,
    content text not null
);

create table matchers (
    parse_info_id uuid references parse_info(id),
    id integer not null,
    class_name varchar(255) not null,
    matcher_type varchar(30) not null,
    name varchar(1024) not null,
    unique (parse_info_id, id)
);

create table nodes (
    parse_info_id uuid references parse_info(id),
    id integer not null,
    parent_id integer not null,
    level integer not null,
    success integer not null,
    matcher_id integer not null,
    start_index integer not null,
    end_index integer not null,
    time bigint not null,
    unique (parse_info_id, id)
);

alter table nodes add foreign key (parse_info_id, matcher_id)
    references matchers(parse_info_id, id);
create index nodes_parent_id on nodes(parent_id);
create index nodes_indices on nodes(start_index, end_index);

现在,我正在做的是获取现有的 zip 文件并将它们插入到 postgresql 数据库中;我正在使用 JooQ 及其 CSV loading API .

过程有点复杂...以下是当前的步骤:

  • 生成一个UUID;
  • 我从 zip 中读取必要的信息(解析日期、输入文本)并将记录写入 parse_info 表中;
  • 我创建了 CSV 的临时副本,以便 JooQ 加载 API 能够使用它(请参阅代码摘录后的原因);
  • 我插入所有匹配器,然后插入所有节点。

代码如下:

public final class Zip2Db2
{
    private static final Pattern SEMICOLON = Pattern.compile(";");
    private static final Function<String, String> CSV_ESCAPE
        = TraceCsvEscaper.ESCAPER::apply;

    // Paths in the zip to the different components
    private static final String INFO_PATH = "/info.csv";
    private static final String INPUT_PATH = "/input.txt";
    private static final String MATCHERS_PATH = "/matchers.csv";
    private static final String NODES_PATH = "/nodes.csv";

    // Fields to use for matchers zip insertion
    private static final List<Field<?>> MATCHERS_FIELDS = Arrays.asList(
        MATCHERS.PARSE_INFO_ID, MATCHERS.ID, MATCHERS.CLASS_NAME,
        MATCHERS.MATCHER_TYPE, MATCHERS.NAME
    );

    // Fields to use for nodes zip insertion
    private static final List<Field<?>> NODES_FIELDS = Arrays.asList(
        NODES.PARSE_INFO_ID, NODES.PARENT_ID, NODES.ID, NODES.LEVEL,
        NODES.SUCCESS, NODES.MATCHER_ID, NODES.START_INDEX, NODES.END_INDEX,
        NODES.TIME
    );

    private final FileSystem fs;
    private final DSLContext jooq;
    private final UUID uuid;

    private final Path tmpdir;

    public Zip2Db2(final FileSystem fs, final DSLContext jooq, final UUID uuid)
        throws IOException
    {
        this.fs = fs;
        this.jooq = jooq;
        this.uuid = uuid;

        tmpdir = Files.createTempDirectory("zip2db");
    }

    public void removeTmpdir()
        throws IOException
    {
        // From java7-fs-more (https://github.com/fge/java7-fs-more)
        MoreFiles.deleteRecursive(tmpdir, RecursionMode.KEEP_GOING);
    }

    public void run()
    {
        time(this::generateMatchersCsv, "Generate matchers CSV");
        time(this::generateNodesCsv, "Generate nodes CSV");
        time(this::writeInfo, "Write info record");
        time(this::writeMatchers, "Write matchers");
        time(this::writeNodes, "Write nodes");
    }

    private void generateMatchersCsv()
        throws IOException
    {
        final Path src = fs.getPath(MATCHERS_PATH);
        final Path dst = tmpdir.resolve("matchers.csv");

        try (
            final Stream<String> lines = Files.lines(src);
            final BufferedWriter writer = Files.newBufferedWriter(dst,
                StandardOpenOption.CREATE_NEW);
        ) {
            // Throwing below is from throwing-lambdas
            // (https://github.com/fge/throwing-lambdas)
            lines.map(this::toMatchersLine)
                .forEach(Throwing.consumer(writer::write));
        }
    }

    private String toMatchersLine(final String input)
    {
        final List<String> parts = new ArrayList<>();
        parts.add('"' + uuid.toString() + '"');
        Arrays.stream(SEMICOLON.split(input, 4))
            .map(s -> '"' + CSV_ESCAPE.apply(s) + '"')
            .forEach(parts::add);
        return String.join(";", parts) + '\n';
    }

    private void generateNodesCsv()
        throws IOException
    {
        final Path src = fs.getPath(NODES_PATH);
        final Path dst = tmpdir.resolve("nodes.csv");

        try (
            final Stream<String> lines = Files.lines(src);
            final BufferedWriter writer = Files.newBufferedWriter(dst,
                StandardOpenOption.CREATE_NEW);
        ) {
            lines.map(this::toNodesLine)
                .forEach(Throwing.consumer(writer::write));
        }
    }

    private String toNodesLine(final String input)
    {
        final List<String> parts = new ArrayList<>();
        parts.add('"' + uuid.toString() + '"');
        SEMICOLON.splitAsStream(input)
            .map(s -> '"' + CSV_ESCAPE.apply(s) + '"')
            .forEach(parts::add);
        return String.join(";", parts) + '\n';
    }

    private void writeInfo()
        throws IOException
    {
        final Path path = fs.getPath(INFO_PATH);

        try (
            final BufferedReader reader = Files.newBufferedReader(path);
        ) {
            final String[] elements = SEMICOLON.split(reader.readLine());

            final long epoch = Long.parseLong(elements[0]);
            final Instant instant = Instant.ofEpochMilli(epoch);
            final ZoneId zone = ZoneId.systemDefault();
            final LocalDateTime time = LocalDateTime.ofInstant(instant, zone);

            final ParseInfoRecord record = jooq.newRecord(PARSE_INFO);

            record.setId(uuid);
            record.setContent(loadText());
            record.setDate(Timestamp.valueOf(time));

            record.insert();
        }
    }

    private String loadText()
        throws IOException
    {
        final Path path = fs.getPath(INPUT_PATH);

        try (
            final BufferedReader reader = Files.newBufferedReader(path);
        ) {
            return CharStreams.toString(reader);
        }
    }

    private void writeMatchers()
        throws IOException
    {
        final Path path = tmpdir.resolve("matchers.csv");

        try (
            final BufferedReader reader = Files.newBufferedReader(path);
        ) {
            jooq.loadInto(MATCHERS)
                .onErrorAbort()
                .loadCSV(reader)
                .fields(MATCHERS_FIELDS)
                .separator(';')
                .execute();
        }
    }

    private void writeNodes()
        throws IOException
    {
        final Path path = tmpdir.resolve("nodes.csv");

        try (
            final BufferedReader reader = Files.newBufferedReader(path);
        ) {
            jooq.loadInto(NODES)
                .onErrorAbort()
                .loadCSV(reader)
                .fields(NODES_FIELDS)
                .separator(';')
                .execute();
        }
    }

    private void time(final ThrowingRunnable runnable, final String description)
    {
        System.out.println(description + ": start");
        final Stopwatch stopwatch = Stopwatch.createStarted();
        runnable.run();
        System.out.println(description + ": done (" + stopwatch.stop() + ')');
    }

    public static void main(final String... args)
        throws IOException
    {
        if (args.length != 1) {
            System.err.println("missing zip argument");
            System.exit(2);
        }

        final Path zip = Paths.get(args[0]).toRealPath();

        final UUID uuid = UUID.randomUUID();
        final DSLContext jooq = PostgresqlTraceDbFactory.defaultFactory()
            .getJooq();

        try (
            final FileSystem fs = MoreFileSystems.openZip(zip, true);
        ) {
            final Zip2Db2 zip2Db = new Zip2Db2(fs, jooq, uuid);
            try {
                zip2Db.run();
            } finally {
                zip2Db.removeTmpdir();
            }
        }
    }
}

现在,这是我的第一个问题......它比加载到 H2 慢得多。以下是包含 620 个匹配器和 45746 个节点的 CSV 的时序:

Generate matchers CSV: start
Generate matchers CSV: done (45.26 ms)
Generate nodes CSV: start
Generate nodes CSV: done (573.2 ms)
Write info record: start
Write info record: done (311.1 ms)
Write matchers: start
Write matchers: done (4.192 s)
Write nodes: start
Write nodes: done (22.64 s)

给予或接受,忘记有关编写专门 CSV 的部分(见下文),即 25 秒。将其加载到基于磁盘的动态 H2 数据库中只需 不到 5 秒!

我遇到的另一个问题是我必须编写专用的 CSV;似乎 CSV 加载 API 在接受的内容上并不是很灵活,例如,我不得不转这一行:

328;SequenceMatcher;COMPOSITE;token

进入这个:

"some-randome-uuid-here";"328";"SequenceMatcher";"COMPOSITE";"token"

但我最大的问题实际上是这个 zip 非常小。例如,我有一个 zip 不是 620,而是 1532 个匹配器,不是 45746 个节点,而是超过 3400 万个节点;即使我们忽略 CSV 生成时间(原始节点 CSV 为 1.2 GiB),由于 H2 注入(inject)需要 20 分钟,将其乘以 5 会得到 1 小时 30 分以南某个点的时间,这很多!

总而言之,目前这个过程效率很低......


现在,为 PostgreSQL 辩护:

  • 对 PostgreSQL 实例的约束远高于对 H2 实例的约束:我不需要生成的 zip 文件中的 UUID;
  • H2 针对写入进行了“不安全”调整:jdbc:h2:/path/to/db;LOG=0;LOCK_MODE=0;UNDO_LOG=0;CACHE_SIZE=131072

尽管如此,插入时间的这种差异似乎有点过分,但我确信它可以更好。但我不知道从哪里开始。

另外,我知道 PostgreSQL 有一个专门的机制来从 CSV 加载,但这里的 CSV 是在一个 zip 文件中开始的,我真的很想避免像我现在这样创建一个专用的 CSV做...理想情况下,我想直接从 zip 中逐行读取(这是我为 H2 注入(inject)所做的),转换行并写入 PostgreSQL 模式。

最后,我还知道我目前没有在插入前禁用对 PostgreSQL 模式的约束;我还没有尝试过这个(它会有所作为吗?)。

那么,您建议我如何提高性能?

最佳答案

从 CSV 文件批量插入 PostgreSQL 的最快方法是使用 Copy . COPY 命令针对插入大量行进行了优化。

对于 Java,您可以使用 Copy implementation for PostgreSQL JDBC driver

这里有一个很好的小例子来说明如何使用它:how to copy a data from file to PostgreSQL using JDBC?

如果你有一个带有标题的 CSV,你会想要运行一个类似于这样的命令:

\COPY mytable FROM '/tmp/mydata.csv' DELIMITER ';' CSV 标题

将大量数据添加到现有表时的另一个性能提升是删除索引,插入数据,然后重新创建索引。

关于java - 从 CSV 加载时的 PostgreSQL/JooQ 批量插入性能问题;我如何改进流程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29749615/

相关文章:

sql - 使用非常大的结果集查询 Postgresql

postgresql - 使用 Sequelize 批量创建模型实例时,如何设置外键列?

java - 为什么我的 Maven Surefire 插件被忽略?

java - 获取字符串的具体值

sql - 我应该在数据库模式中允许空值吗?

java - JDBC - 选择列为 NULL 的位置

PostgreSQL 公共(public)访问数据库

java - 双输出递归函数

java - 并发检查栈是否为空

SQLite - 我们可以将参数从批处理文件传递到 .sql 文件吗?