java - 使用批量数据设计独立于数据源的应用程序

标签 java mongodb postgresql design-patterns spring-kafka

我们有一个遗留应用程序,可以为每个用户从 mongo 读取数据(查询结果根据用户请求从小到大),并且我们的应用程序为每个用户创建一个文件并放入 FTP 服务器/s3。我们将数据作为 mongo 游标读取,并在获得批量数据后立即将每个批处理写入文件,因此文件写入性能不错。该应用程序运行良好,但绑定(bind)到 mongo 和 mongo 光标。

现在我们必须重新设计这个应用程序,因为我们必须支持不同的数据源,即 MongoDB、Postgres DB、Kinesis、S3 等。到目前为止,我们已经想到了以下想法:

  1. 为每个源构建数据 API 并公开分页的 REST 响应。这是一个可行的解决方案,但对于大型企业来说可能会很慢 查询数据与当前光标响应进行比较。
  2. 通过在kafka中输入批量数据并在文件生成器中读取批量数据流来构建数据抽象层。但大多数时候用户要求排序数据,因此我们需要按顺序读取消息。我们将失去巨大的吞吐量和在写入文件之前组合这些数据消息的大量额外工作的好处。

我们正在寻找一种解决方案来替换当前的 mongo 游标,并使我们的文件生成器独立于数据源。

最佳答案

听起来您本质上是想创建一个 API,在其中可以尽可能保持流式传输的效率,就像您在读取用户数据时写入文件一样。

在这种情况下,您可能需要为 ReadSource 定义一个推送解析器 API,它将数据流式传输到 WriteTarget,后者将数据写入任何内容你有一个实现。排序将在 ReadSource 方面处理,因为对于某些源,您可以按顺序读取(例如从数据库中);对于无法执行此操作的那些源,您可以简单地执行中间步骤来对数据进行排序(例如写入临时表),然后将其流式传输到 WriteTarget

基本的实现可能看起来像这样:

public class UserDataRecord {
    private String data1;
    private String data2;

    public String getRecordAsString() {
        return data1 + "," + data2;
    }
}
<小时/>
public interface WriteTarget<Record> {
    /** Write a record to the target */
    public void writeRecord(Record record);

    /** Finish writing to the target and save everything */
    public void commit();

    /** Undo whatever was written */
    public void rollback();
}
<小时/>
public abstract class ReadSource<Record> {
    protected final WriteTarget<Record> writeTarget;

    public ReadSource(WriteTarget<Record> writeTarget) { this.writeTarget = writeTarget; }

    public abstract void read();
}
<小时/>
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class RelationalDatabaseReadSource extends ReadSource<UserDataRecord> {
    private Connection dbConnection;

    public RelationalDatabaseReadSource (WriteTarget<UserDataRecord> writeTarget, Connection dbConnection) {
        super(writeTarget);
        this.dbConnection = dbConnection;
    }

    @Override public void read() {
        // read user data from DB and encapsulate it in a record
        try (Statement statement = dbConnection.createStatement();
                ResultSet resultSet = statement.executeQuery("Select * From TABLE Order By COLUMNS");) {
            while (resultSet.next()) {
                UserDataRecord record = new UserDataRecord();
                // stream the records to the write target
                writeTarget.writeRecord(record);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}
<小时/>
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
public class FileWriteTarget implements WriteTarget<UserDataRecord> {
    private File fileToWrite;
    private PrintWriter writer;

    public FileWriteTarget(File fileToWrite) throws IOException {
        this.fileToWrite = fileToWrite;
        this.writer = new PrintWriter(new FileWriter(fileToWrite));
    }

    @Override public void writeRecord(UserDataRecord record) {
        writer.println(record.getRecordAsString().getBytes(StandardCharsets.UTF_8));
    }

    @Override public void commit() {
        // write trailing records
        writer.close();
    }

    @Override
    public void rollback() {
        try { writer.close(); } catch (Exception e) { }
        fileToWrite.delete();
    }
}
<小时/>

这只是总体思路,需要认真改进。 请任何人随时更新此 API。

关于java - 使用批量数据设计独立于数据源的应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58612321/

相关文章:

node.js - 用户如何使用 sequelize postgres nodejs 来喜欢和不喜欢彼此的帖子?

java - 如何获取关注者和关注列表屏幕名称 (Twitter4J)?

javascript - 未捕获的 TypeError : text. 搜索不是 HTMLLIElement 的函数

java - For 循环卡住了我的程序

node.js - 'this' 在 Mongoose 预保存 Hook 中未定义

sql - 如何在 postgresql 中使用 array_agg 包含 NULL 值?

node.js - 带有回调或异步/等待的 node-postgres 事务?

Java BigInteger 内存分配

mongodb - Mongo shell 立即关闭

node.js - 将范围传递给 db.collection.group