mysql - 在Java应用程序中使用JDBC并行读取大数据的标准算法或模式

标签 mysql apache-spark jdbc

下面是一个简单的程序,它从 MySQL 读取数据并将其存储在 CSV 文件中。如果查询返回超过 1000 万条记录,速度就会很慢。

我完全理解,要并行执行,我们需要像这样的过程

  1. 从查询中获取记录数(select * from users)
  2. 然后使用适当的方法将查询分解为并行 block ( select * from users where state = 'CA' )
  3. 然后数据可以在 50 个线程中并行读取或跨进程分布。

Apache Spark 使用具有下上限和分区数量的partition_column,如下所示。

我很想知道是否有一种方法/模式/算法可以在非 Spark 应用程序中使用来并行获取大量数据。不过,我将查看以下实现的 Spark 代码。

https://medium.com/@radek.strnad/tips-for-using-jdbc-in-apache-spark-sql-396ea7b2e3d3

spark.read("jdbc")
  .option("url", url)
  .option("dbtable", "pets")
  .option("user", user)
  .option("password", password)
  .option("numPartitions", 10)
  .option("partitionColumn", "owner_id")
  .option("lowerBound", 1)
  .option("upperBound", 10000)
  .load()

SELECT * FROM pets WHERE owner_id >= 1 and owner_id < 1000
SELECT * FROM pets WHERE owner_id >= 1000 and owner_id < 2000
SELECT * FROM pets WHERE owner_id >= 2000 and owner_id < 3000

在 CSV 文件中读取和存储数据的简单 MySQL 代码

public static void main(String[] args)
{
    try
    {
        String myDriver = "org.gjt.mm.mysql.Driver";
        String myUrl = "jdbc:mysql://localhost/test";
        Class.forName(myDriver);
        Connection conn = DriverManager.getConnection(myUrl, "root", "");
        String query = "SELECT * FROM users";
        Statement st = conn.createStatement();
        ResultSet rs = st.executeQuery(query);
        StringBuilder sb = new StringBuilder();
        while (rs.next())
        {
            int id = rs.getInt("id");
            String firstName = rs.getString("first_name");
            String lastName = rs.getString("last_name");
            Date dateCreated = rs.getDate("date_created");
            boolean isAdmin = rs.getBoolean("is_admin");
            int numPoints = rs.getInt("num_points");
            sb.append(String.format("%s, %s, %s, %s, %s, %s\n", id, firstName, lastName, dateCreated, isAdmin, numPoints));
        }

        try (FileOutputStream oS = new FileOutputStream(new File("aFile.csv"))) {
            oS.write(sb.toString().getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }
        st.close();
    }
    catch (Exception e)
    {
        System.err.println("Got an exception! ");
        System.err.println(e.getMessage());
    }
}

最佳答案

这并不能准确回答你的问题,但是SELECT DATA INTO OUTFILE可以帮助您快速导出数据。

以下是根据您的情况生成 CSV 文件的命令示例,

SELECT * 
  INTO OUTFILE '/some/path/to/users.csv'
FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"'
 LINES TERMINATED BY '\n'
  FROM users;

这使用快速路径将数据写入文件系统,并且可能比线程方法更快。 当然更容易编程。

在如此大容量的查询之前使用 SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; 总是一个好主意,以避免阻止对表的插入和更新。

如果您将使用多个 Java 线程来检索数据,我建议您使用此策略:

  1. 在生成线程之前,通过执行以下查询确定最大的 id 值:SELECT MAX(id) FROM users;

  2. 决定要生成多少个线程。太多的线程会适得其反,因为它们会让你的 MySQL 服务器重载。五十个线程与 MySQL 服务器的连接远远太多了。使用四个或八个。

  3. 为每个线程提供自己的要检索的 id 值段。例如,如果您有 1000 万行和四个线程,则段将为 [1-2500000]、[2500001-5000000]、[5000001-7500000] 和 [7500001-10000000]。

  4. 在每个线程中,打开与 MySQL 的 jdbc 连接,然后执行 WHERE id BETWEENegmentstart ANDsegmentfinish 来选择正确的行。 (MySQL 连接不是线程安全对象)。

  5. SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; 放在 SELECT 查询之前。

id(大概)是users表的主键,因此使用它的WHERE过滤将非常有效。

关于mysql - 在Java应用程序中使用JDBC并行读取大数据的标准算法或模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50275054/

相关文章:

MySQL 根据多个因素查找平均值

MySQL UNION 正在改变数据格式

java - 禁用上下文 LOB 创建,因为 JDBC 驱动程序报告 JDBC 版本 [3] 小于 4

java - 如何将类作为 Java 函数中的参数发送?

java - JDBC RowMapper 和转换

mysql - 为什么此 GROUP BY 子句会导致此错误 "42000Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column..."?

javascript - 从 javascript 日历连接到 MySQL

apache-spark - Spark Partitionby 无法按预期扩展

scala - 当我尝试在 Spark-Scala 中编译 SBT 时,错误 package org.apache.spark.sql is not a value

apache-spark - 是否可以将 Spark-NLP 库与 Spark Structured Streaming 结合使用?