java - 使用 Apache Spark SQLContext 写入时出错

标签 java mysql apache-spark

我是 Spark SQL 的新手。我按照 DataBricks 的在线指南进行操作:https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

我可以成功连接到 MySQL 实例并从中读取数据。但我不断从 Spark SQL 中收到各种 NoTableFound 或 NoDatabaseFound 错误。这是我的整个测试类的样子:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;

public class MySqlConnectionTest {
    private static final String MYSQL_USERNAME = "";
    private static final String MYSQL_PASSWORD = "";
    private static final String MYSQL_HOSTNAME = "";
    private static final String MYSQL_PORT = "";
    private static final String MYSQL_DATABASE = "";
    private static final String MYSQL_URL = "jdbc:mysql://" + MYSQL_HOSTNAME + ":" + MYSQL_PORT + "/" + MYSQL_DATABASE + "?user=" + MYSQL_USERNAME + "&password=" + MYSQL_PASSWORD;

public static void main(String[] args) {
    Properties connectionProperties = new Properties();
    connectionProperties.put("user", MYSQL_USERNAME);
    connectionProperties.put("password", MYSQL_PASSWORD);

    /* First verify we are getting a valid connection!
    try {
        testConnection();
    } catch(Exception e) {
        e.printStackTrace();
    } */

    /*
    * NONE of the writeToSummary methods work! The readFromSummary methods work fine...
    * */
//        writeToSummary(connectionProperties);
//        writeToSummaryV2(connectionProperties);
    writeToSummaryV3(connectionProperties);
}

private static void testConnection() throws ClassNotFoundException, SQLException {
    Class.forName("com.mysql.jdbc.Driver");
    Connection connection = DriverManager.getConnection(MYSQL_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
    boolean result = connection.isClosed();
    System.out.println("@@ is connection closed?? ==> " + result);
}

private static SparkSession getSparkSession(){
    return SparkSession.builder().master("local[2]").appName("readUsageSummaryV2").getOrCreate();
}

private static SQLContext getSqlContext() {
    SparkConf sparkConf = new SparkConf()
            .setAppName("saveUsageSummary")
            .setMaster("local[2]");

    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    return new SQLContext(javaSparkContext);
}

private static void readFromSummary(Properties connectionProperties) {
    Dataset dataSet = getSqlContext().read().jdbc(MYSQL_URL, "summary", connectionProperties);
    dataSet.printSchema();

    dataSet.select("id","cycle_key", "product", "access_method", "billed", "received_date")
            .limit(5)
            .show();
}

private static void readFromSummaryV2(Properties connectionProperties) {
    Dataset dataSet = getSparkSession().read().jdbc(MYSQL_URL, "summary", connectionProperties);
    dataSet.select("id","cycle_key", "product", "access_method", "billed", "received_date")
            .limit(5)
            .show();
}

private static void writeToSummary(Properties connectionProperties) {
    SQLContext sqlContext = getSqlContext();
    sqlContext.tables("usages")
            .write()
//                .mode(SaveMode.Append)
            .jdbc(MYSQL_URL, "summary", connectionProperties);
}

private static void writeToSummaryV2(Properties connectionProperties) {
    SQLContext sqlContext = getSqlContext();
    sqlContext.table("summary")
            .write()
//        .mode(SaveMode.Append)
            .jdbc(MYSQL_URL, "summary", connectionProperties);
}

private static void writeToSummaryV3(Properties connectionProperties) {
    SQLContext sqlContext = getSqlContext();
    sqlContext.sql("SELECT * FROM summary LIMIT 5")
            .write()
//        .mode(SaveMode.Append)
            .jdbc(MYSQL_URL, "summary", connectionProperties);
}

}

最佳答案

答案总是很简单...我以新的眼光重新阅读了文档,并了解到要使其工作,Dataset.write() 方法必须编写 Spark SQL 上下文中已存在的内容。因此,我可以让它针对通过从数据库读取而创建的数据集进行写入,如下所示:

private static void writeToSummaryV4(Properties connectionProperties) {
    Dataset summary = getSparkSession().read().jdbc(MYSQL_URL, "summary", connectionProperties);
    summary.select("comp_code","cycle_key", "product", "access_method", "billed", "received_date")
            .limit(5)
            .show();

    summary.write().mode(SaveMode.Append).jdbc(MYSQL_URL, "summary", connectionProperties);
} 

关于java - 使用 Apache Spark SQLContext 写入时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43727319/

相关文章:

java - OpenCV/JavaCV Android 人脸检测初始化

java - 当标记变得可见时,在标记周围添加一个圆圈

java - 从字节数组创建图像,通过后端作业将其添加到 PDF 和电子邮件中

mysql - 在 SQL : Oldest records, 中排序,但 x 行内没有重复值

java - 在 Scala 中,如何将 Spark 数据集映射到 POJO 列表?

hadoop - 我们如何将 HadoopRDD 结果转换为 Parquet 格式?

java - Java中有没有更有效的方法从文件中读取数字?

mysql - 通过将大型键值存储从 MySQL 迁移到 NoSQL 数据库,我能否期望性能得到显着提升?

mysql - SQL 过程 : Updating a specific row to current system year

scala - 是否可以在 Scala 中打印函数的定义