下面是一个简单的程序,它从 MySQL 读取数据并将其存储在 CSV 文件中。如果查询返回超过 1000 万条记录,速度就会很慢。
我完全理解,要并行执行,我们需要像这样的过程
- 从查询中获取记录数(select * from users)
- 然后使用适当的方法将查询分解为并行 block ( select * from users where state = 'CA' )
- 然后数据可以在 50 个线程中并行读取或跨进程分布。
Apache Spark 使用具有下上限和分区数量的partition_column,如下所示。
我很想知道是否有一种方法/模式/算法可以在非 Spark 应用程序中使用来并行获取大量数据。不过,我将查看以下实现的 Spark 代码。
https://medium.com/@radek.strnad/tips-for-using-jdbc-in-apache-spark-sql-396ea7b2e3d3
spark.read("jdbc")
.option("url", url)
.option("dbtable", "pets")
.option("user", user)
.option("password", password)
.option("numPartitions", 10)
.option("partitionColumn", "owner_id")
.option("lowerBound", 1)
.option("upperBound", 10000)
.load()
SELECT * FROM pets WHERE owner_id >= 1 and owner_id < 1000
SELECT * FROM pets WHERE owner_id >= 1000 and owner_id < 2000
SELECT * FROM pets WHERE owner_id >= 2000 and owner_id < 3000
在 CSV 文件中读取和存储数据的简单 MySQL 代码
public static void main(String[] args)
{
try
{
String myDriver = "org.gjt.mm.mysql.Driver";
String myUrl = "jdbc:mysql://localhost/test";
Class.forName(myDriver);
Connection conn = DriverManager.getConnection(myUrl, "root", "");
String query = "SELECT * FROM users";
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery(query);
StringBuilder sb = new StringBuilder();
while (rs.next())
{
int id = rs.getInt("id");
String firstName = rs.getString("first_name");
String lastName = rs.getString("last_name");
Date dateCreated = rs.getDate("date_created");
boolean isAdmin = rs.getBoolean("is_admin");
int numPoints = rs.getInt("num_points");
sb.append(String.format("%s, %s, %s, %s, %s, %s\n", id, firstName, lastName, dateCreated, isAdmin, numPoints));
}
try (FileOutputStream oS = new FileOutputStream(new File("aFile.csv"))) {
oS.write(sb.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
st.close();
}
catch (Exception e)
{
System.err.println("Got an exception! ");
System.err.println(e.getMessage());
}
}
最佳答案
这并不能准确回答你的问题,但是SELECT DATA INTO OUTFILE
可以帮助您快速导出数据。
以下是根据您的情况生成 CSV 文件的命令示例,
SELECT *
INTO OUTFILE '/some/path/to/users.csv'
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n'
FROM users;
这使用快速路径将数据写入文件系统,并且可能比线程方法更快。 当然更容易编程。
在如此大容量的查询之前使用 SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
总是一个好主意,以避免阻止对表的插入和更新。
如果您将使用多个 Java 线程来检索数据,我建议您使用此策略:
在生成线程之前,通过执行以下查询确定最大的
id
值:SELECT MAX(id) FROM users;
决定要生成多少个线程。太多的线程会适得其反,因为它们会让你的 MySQL 服务器重载。五十个线程与 MySQL 服务器的连接远远太多了。使用四个或八个。
为每个线程提供自己的要检索的 id 值段。例如,如果您有 1000 万行和四个线程,则段将为 [1-2500000]、[2500001-5000000]、[5000001-7500000] 和 [7500001-10000000]。
在每个线程中,打开与 MySQL 的 jdbc 连接,然后执行
WHERE id BETWEENegmentstart ANDsegmentfinish
来选择正确的行。 (MySQL 连接不是线程安全对象)。将
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
放在 SELECT 查询之前。
id
(大概)是users
表的主键,因此使用它的WHERE
过滤将非常有效。
关于mysql - 在Java应用程序中使用JDBC并行读取大数据的标准算法或模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50275054/