java - 如何使用 REST API 调用将非常大的 postgres 数据库表同步到 Web 软件?

标签 java postgresql data-migration

以下是您应该了解的一些事项:

  1. 我有一个 postgres 数据库,其中有一个表“account”,其中包含大约一百万条记录。
  2. 我有一个 Vbout 帐户,需要使用 Vbout Rest API ( https://developers.vbout.com/docs/1_0/#emailmarketing_addcontact ) 将“帐户”表中的记录同步为联系人。
  3. 我创建了一个 python 脚本来生成随机数据并将行填充到 postgres 表中进行测试。

我编写了一个 Java 程序来执行此同步:

  1. 我的java程序首先计算数据库中的记录数,然后将记录划分为范围(假设有100条记录,我想运行5个线程,范围是1-20、21-40、41-60 , 61-80, 81-100) 并使用单独的线程处理每个范围以使过程更快。
  2. 每个线程获取一条记录并为该记录创建一个对象,并将该记录推送到我使用 Arraylist 实现的队列中。

  3. 我有 100 个线程用于将数据发布到 Vbout。这些线程从队列中获取一个对象,并对 vbout 进行 REST API 调用以在其中创建联系人。

这是我面临的问题:

  1. 该程序对于 10000 条记录运行良好,但对于 100 万条记录则无法正常工作。
  2. 当我向表中填充一百万条记录时,PgAdmin 以及我的程序崩溃了。
  3. 对于一百万条记录,我的程序显示连接错误,PgAdmin 显示连接丢失错误。

请帮助我。我无法解决这个问题。

import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.entity.UrlEncodedFormEntity;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.message.BasicNameValuePair;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;


//DB Controller Class used to get Database Connection
//Have made it generic for future use
class DBController {

    private static final String dbName = "dbname";
    private static final String username = "postgres";
    private static final String password = "password";
    private static final String host = "localhost";


    public static Connection getDBConnection() {
        try {
            Class.forName("org.postgresql.Driver");
            Connection connection = DriverManager
                    .getConnection("jdbc:postgresql://" + host + "/" + dbName,
                            username, password);


            return connection;
        } catch (Exception e) {
            System.out.println("Connection To Database failed due to following reasons:");
            e.printStackTrace();
        }

        return null;
    }

}


/*
 * Range Objects are used to divide all the records of the table
 * into parts. Then all the parts are processed by different threads to make the sync faster.
 * */
class Range {
    int from;
    int to;
}

//Vbout Contact Objects store the property for posting into the Vbout API
class VboutContact {
    String firstName;
    String lastName;
    String email;
    String phone;
}


//This is the main class and this should be run only once (THE FIRST TIME) to do an initial sync
public class FirstSync {

   static int count=0;
    static int testCount=0;

    private static final String listId = "######";
    private static final String vboutApiKey = "#######";

    //Default Vbout Threads
    private static final int vboutThreads = 500;

    //Default Database Threads (eg. 5 Threads = 5 Range Objects)
    private static final int dbThreads = 100;

    private static int countRecords = 0;


    //We maintain a queue of contacts that every thread add the records into
    private static volatile ArrayList<VboutContact> contactQueue = new ArrayList<VboutContact>();


    public static void main(String args[]) {

        System.out.println("Connecting to database...");


        System.out.println("Connected to database!");


        countRecords = countRecords();
        doSync();


        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                doPostVbout();
            }
        };

        //Runs vboutThreads number of threads for posting data into vbout default (3)
        for (int i = 0; i < vboutThreads; i++) {
            new Thread(r1).start();
        }

    }

    public static void doPostVbout() {
        while (true) {
            try {
                if (contactQueue.size() > 0) {
                    VboutContact vboutContact = null;
                    synchronized (contactQueue) {
                        vboutContact = contactQueue.remove(0);
                    }
                    System.out.println("Processing Vbout Contact : " + vboutContact.email+" - Name : "+vboutContact.firstName+" "+vboutContact.lastName+" Phone : "+vboutContact.phone);
                    HttpClient httpclient = HttpClients.createDefault();
                    HttpPost httppost = new HttpPost("https://api.vbout.com/1/emailmarketing/addcontact.json?key=" + vboutApiKey);
                    List<NameValuePair> params = new ArrayList<NameValuePair>();
                    params.add(new BasicNameValuePair("email", vboutContact.email));
                    params.add(new BasicNameValuePair("status", "Active"));
                    params.add(new BasicNameValuePair("listid", listId));
                    params.add(new BasicNameValuePair("fields[161784]", vboutContact.firstName));
                    params.add(new BasicNameValuePair("fields[161785]", vboutContact.lastName));
                    params.add(new BasicNameValuePair("fields[161787]", vboutContact.phone));
                    httppost.setEntity(new UrlEncodedFormEntity(params));
                    HttpResponse response = httpclient.execute(httppost);
                    System.out.println(response.toString());
                    count++;
                    System.out.println("\n\n\nProgress : "+count+" / "+countRecords);
                }
            } catch (Exception e) {
            }
            System.out.println("Total Count Records: "+countRecords+" - Total Fetched Records"+testCount);
        }
    }

    public static boolean doSync() {
        boolean syncResultBool = false;
        try {

            //Counts Total Reords in the database
            int count = countRecords;

            //Breaks the total number of records into ranges
            ArrayList<Range> ranges = getRanges(count);

            //Runs Thread of each range
            for (int i = 0; i < ranges.size(); i++) {
                Range thisRange = ranges.get(i);
                Runnable r1 = new Runnable() {
                    @Override
                    public void run() {
                        doSyncThread(thisRange);
                    }
                };
                Thread t1 = new Thread(r1);
                t1.start();
            }
        } catch (Exception e) {
            System.out.println("Sync Crashed Due To Following Issues");
        }
        return syncResultBool;
    }


    public static void doSyncThread(Range range) {
        try {
            System.out.println("Sync Running for " + range.from + " : " + range.to);
            Connection connection=DBController.getDBConnection();
            PreparedStatement preparedStatement = connection.prepareStatement("SELECT id,firstname,lastname,phone,personemail FROM public.account LIMIT ? OFFSET ?");
            preparedStatement.setInt(1, (range.to - range.from)+1);
            System.out.println("\n\n\n\n\nLIMIT "+(range.to - range.from+1)+"OFFSET "+(range.from-1)+"\n\n\n");
            preparedStatement.setInt(2, range.from-1);

            ResultSet rs = preparedStatement.executeQuery();
            while (rs.next()) {

                VboutContact vboutContact = new VboutContact();
                vboutContact.firstName = rs.getString(2);
                vboutContact.lastName = rs.getString(3);
                //vboutContact.phone = rs.getString(4);
                vboutContact.phone = "9999999999";
                vboutContact.email = rs.getString(5).toLowerCase()+"@example.com";

                System.out.println("Adding Record to Queue : " + vboutContact.email);
                //Adds records to queue
                synchronized (contactQueue) {
                    contactQueue.add(vboutContact);
                    testCount++;
                }
            }
            rs.close();
            preparedStatement.close();
            connection.close();
        } catch (Exception e) {
            System.out.println("Sync Crashed Due To Following Issues");
            e.printStackTrace();
        }
    }

    public static int countRecords() {
        int count = 0;
        try {

            Connection connection=DBController.getDBConnection();
            // make sure autocommit is off
            connection.setAutoCommit(false);
            Statement st = connection.createStatement();
            st.setFetchSize(100);
            ResultSet rs = st.executeQuery("select count(id) from account;");
            while (rs.next()) {
                count = rs.getInt(1);
                System.out.print("a row was returned.");
            }
            rs.close();
            st.close();
            connection.close();

        } catch (Exception e) {
            System.out.println("Sync Crashed Due To Following Issues");
            e.printStackTrace();
        }
        System.out.println("Total Records : " + count);
        countRecords=count;
        return count;
    }

    public static ArrayList<Range> getRanges(int count) {
        ArrayList<Range> arrayList = new ArrayList<Range>();
        int threads = dbThreads;
        int parts = count / threads;
        float partf = ((float) count / (float) threads) - (float) parts;
        int temp = Math.round(partf * threads);
        int d = parts;
        int t = 0;
        for (int i = 2; i <= threads + 1; i++) {
            Range range = new Range();
            range.from = t + 1;
            t = (i - 1) * d;
            range.to = t;
            range.to = (i == threads + 1) ? t + temp : t;
            arrayList.add(range);
        }
        return arrayList;
    }

}

最佳答案

对于 PostgreSQL 来说,一百万条记录甚至不算“大”,更不用说“非常大”了。使用 100 个线程从数据库读取一百万条记录是问题所在,而不是解决方案。是什么让您相信这是一件有用的事情?也许您可以将线程用于 vbout(我不知道),但这并不意味着您还需要将它们用于 PostgreSQL。

您说 PgAdmin 崩溃了,但您没有描述 PgAdmin 的任何用法。你用它做什么?确切的错误消息是什么?

你说你的java程序崩溃了。 PostgreSQL 发出的确切错误消息是什么?

调试代码的一个好步骤是阅读错误消息。如果您需要帮助解释它们,那么您需要向我们展示您的错误消息

关于java - 如何使用 REST API 调用将非常大的 postgres 数据库表同步到 Web 软件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60848950/

相关文章:

linux - 启动 postgresql 时遇到问题

sql-server - 使用不同的模式同步 2 个数据库

java - 为 Undertow embedded 设置默认编码

java - "javax.ejb.NoSuchEJBException: Could not find stateful bean: "

postgresql - 在 PostgreSQL 中锁定表时,出现错误 : LOCK TABLE can only be used in transaction blocks

sql - 如何避免 PostgreSQL 中的嵌套查询

java - Function<T, R> 作为 Junit 5 中的附加参数

java - 为什么该控件在后台线程执行时不更新其内容?

redis - 将转储文件加载到 redis 数据库的问题

azure - 如何将 2 个团队集合从 tfs 迁移到 azure devops 服务中的一个组织?