以下是您应该了解的一些事项:
- 我有一个 postgres 数据库,其中有一个表“account”,其中包含大约一百万条记录。
- 我有一个 Vbout 帐户,需要使用 Vbout Rest API ( https://developers.vbout.com/docs/1_0/#emailmarketing_addcontact ) 将“帐户”表中的记录同步为联系人。
- 我创建了一个 python 脚本来生成随机数据并将行填充到 postgres 表中进行测试。
我编写了一个 Java 程序来执行此同步:
- 我的java程序首先计算数据库中的记录数,然后将记录划分为范围(假设有100条记录,我想运行5个线程,范围是1-20、21-40、41-60 , 61-80, 81-100) 并使用单独的线程处理每个范围以使过程更快。
每个线程获取一条记录并为该记录创建一个对象,并将该记录推送到我使用 Arraylist 实现的队列中。
我有 100 个线程用于将数据发布到 Vbout。这些线程从队列中获取一个对象,并对 vbout 进行 REST API 调用以在其中创建联系人。
这是我面临的问题:
- 该程序对于 10000 条记录运行良好,但对于 100 万条记录则无法正常工作。
- 当我向表中填充一百万条记录时,PgAdmin 以及我的程序崩溃了。
- 对于一百万条记录,我的程序显示连接错误,PgAdmin 显示连接丢失错误。
请帮助我。我无法解决这个问题。
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.entity.UrlEncodedFormEntity;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.message.BasicNameValuePair;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
//DB Controller Class used to get Database Connection
//Have made it generic for future use
class DBController {
private static final String dbName = "dbname";
private static final String username = "postgres";
private static final String password = "password";
private static final String host = "localhost";
public static Connection getDBConnection() {
try {
Class.forName("org.postgresql.Driver");
Connection connection = DriverManager
.getConnection("jdbc:postgresql://" + host + "/" + dbName,
username, password);
return connection;
} catch (Exception e) {
System.out.println("Connection To Database failed due to following reasons:");
e.printStackTrace();
}
return null;
}
}
/*
* Range Objects are used to divide all the records of the table
* into parts. Then all the parts are processed by different threads to make the sync faster.
* */
class Range {
int from;
int to;
}
//Vbout Contact Objects store the property for posting into the Vbout API
class VboutContact {
String firstName;
String lastName;
String email;
String phone;
}
//This is the main class and this should be run only once (THE FIRST TIME) to do an initial sync
public class FirstSync {
static int count=0;
static int testCount=0;
private static final String listId = "######";
private static final String vboutApiKey = "#######";
//Default Vbout Threads
private static final int vboutThreads = 500;
//Default Database Threads (eg. 5 Threads = 5 Range Objects)
private static final int dbThreads = 100;
private static int countRecords = 0;
//We maintain a queue of contacts that every thread add the records into
private static volatile ArrayList<VboutContact> contactQueue = new ArrayList<VboutContact>();
public static void main(String args[]) {
System.out.println("Connecting to database...");
System.out.println("Connected to database!");
countRecords = countRecords();
doSync();
Runnable r1 = new Runnable() {
@Override
public void run() {
doPostVbout();
}
};
//Runs vboutThreads number of threads for posting data into vbout default (3)
for (int i = 0; i < vboutThreads; i++) {
new Thread(r1).start();
}
}
public static void doPostVbout() {
while (true) {
try {
if (contactQueue.size() > 0) {
VboutContact vboutContact = null;
synchronized (contactQueue) {
vboutContact = contactQueue.remove(0);
}
System.out.println("Processing Vbout Contact : " + vboutContact.email+" - Name : "+vboutContact.firstName+" "+vboutContact.lastName+" Phone : "+vboutContact.phone);
HttpClient httpclient = HttpClients.createDefault();
HttpPost httppost = new HttpPost("https://api.vbout.com/1/emailmarketing/addcontact.json?key=" + vboutApiKey);
List<NameValuePair> params = new ArrayList<NameValuePair>();
params.add(new BasicNameValuePair("email", vboutContact.email));
params.add(new BasicNameValuePair("status", "Active"));
params.add(new BasicNameValuePair("listid", listId));
params.add(new BasicNameValuePair("fields[161784]", vboutContact.firstName));
params.add(new BasicNameValuePair("fields[161785]", vboutContact.lastName));
params.add(new BasicNameValuePair("fields[161787]", vboutContact.phone));
httppost.setEntity(new UrlEncodedFormEntity(params));
HttpResponse response = httpclient.execute(httppost);
System.out.println(response.toString());
count++;
System.out.println("\n\n\nProgress : "+count+" / "+countRecords);
}
} catch (Exception e) {
}
System.out.println("Total Count Records: "+countRecords+" - Total Fetched Records"+testCount);
}
}
public static boolean doSync() {
boolean syncResultBool = false;
try {
//Counts Total Reords in the database
int count = countRecords;
//Breaks the total number of records into ranges
ArrayList<Range> ranges = getRanges(count);
//Runs Thread of each range
for (int i = 0; i < ranges.size(); i++) {
Range thisRange = ranges.get(i);
Runnable r1 = new Runnable() {
@Override
public void run() {
doSyncThread(thisRange);
}
};
Thread t1 = new Thread(r1);
t1.start();
}
} catch (Exception e) {
System.out.println("Sync Crashed Due To Following Issues");
}
return syncResultBool;
}
public static void doSyncThread(Range range) {
try {
System.out.println("Sync Running for " + range.from + " : " + range.to);
Connection connection=DBController.getDBConnection();
PreparedStatement preparedStatement = connection.prepareStatement("SELECT id,firstname,lastname,phone,personemail FROM public.account LIMIT ? OFFSET ?");
preparedStatement.setInt(1, (range.to - range.from)+1);
System.out.println("\n\n\n\n\nLIMIT "+(range.to - range.from+1)+"OFFSET "+(range.from-1)+"\n\n\n");
preparedStatement.setInt(2, range.from-1);
ResultSet rs = preparedStatement.executeQuery();
while (rs.next()) {
VboutContact vboutContact = new VboutContact();
vboutContact.firstName = rs.getString(2);
vboutContact.lastName = rs.getString(3);
//vboutContact.phone = rs.getString(4);
vboutContact.phone = "9999999999";
vboutContact.email = rs.getString(5).toLowerCase()+"@example.com";
System.out.println("Adding Record to Queue : " + vboutContact.email);
//Adds records to queue
synchronized (contactQueue) {
contactQueue.add(vboutContact);
testCount++;
}
}
rs.close();
preparedStatement.close();
connection.close();
} catch (Exception e) {
System.out.println("Sync Crashed Due To Following Issues");
e.printStackTrace();
}
}
public static int countRecords() {
int count = 0;
try {
Connection connection=DBController.getDBConnection();
// make sure autocommit is off
connection.setAutoCommit(false);
Statement st = connection.createStatement();
st.setFetchSize(100);
ResultSet rs = st.executeQuery("select count(id) from account;");
while (rs.next()) {
count = rs.getInt(1);
System.out.print("a row was returned.");
}
rs.close();
st.close();
connection.close();
} catch (Exception e) {
System.out.println("Sync Crashed Due To Following Issues");
e.printStackTrace();
}
System.out.println("Total Records : " + count);
countRecords=count;
return count;
}
public static ArrayList<Range> getRanges(int count) {
ArrayList<Range> arrayList = new ArrayList<Range>();
int threads = dbThreads;
int parts = count / threads;
float partf = ((float) count / (float) threads) - (float) parts;
int temp = Math.round(partf * threads);
int d = parts;
int t = 0;
for (int i = 2; i <= threads + 1; i++) {
Range range = new Range();
range.from = t + 1;
t = (i - 1) * d;
range.to = t;
range.to = (i == threads + 1) ? t + temp : t;
arrayList.add(range);
}
return arrayList;
}
}
最佳答案
对于 PostgreSQL 来说,一百万条记录甚至不算“大”,更不用说“非常大”了。使用 100 个线程从数据库读取一百万条记录是问题所在,而不是解决方案。是什么让您相信这是一件有用的事情?也许您可以将线程用于 vbout(我不知道),但这并不意味着您还需要将它们用于 PostgreSQL。
您说 PgAdmin 崩溃了,但您没有描述 PgAdmin 的任何用法。你用它做什么?确切的错误消息是什么?
你说你的java程序崩溃了。 PostgreSQL 发出的确切错误消息是什么?
调试代码的一个好步骤是阅读错误消息。如果您需要帮助解释它们,那么您需要向我们展示您的错误消息。
关于java - 如何使用 REST API 调用将非常大的 postgres 数据库表同步到 Web 软件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60848950/