java - 使用 Spark 从 HDFS 到 Oracle BLOB 的 CSV 文件

标签 java oracle apache-spark

我正在开发一个 Java 应用程序,它使用 Spark 2.3.1 将数据从 Oracle 加载到 HDFS,反之亦然。 我想在 HDFS 中创建 CSV 文件,然后将其加载到 Oracle (12.2) BLOB。

代码..

        //create Dataset
        Dataset<Row> dataset = SparkService.sql("select * from test_table");
        String trgtFileWithPath = "/tmp/test_table.csv";      

        //save file in HDFS
        dataset.write().mode("overwrite").format("csv").save(trgtFileWithPath);

        //get file from HDFS
        JavaSparkContext jsc = SparkContextUtil.getJavaSparkContext("appId");
        JavaRDD<String> textFile = jsc.textFile(trgtFileWithPath);

        //Call Oracle package, that inserts into table with BLOB field
        File csvFile = new File("/tmp/ETLFramework/test_table1.csv");
        BufferedInputStream bis = new BufferedInputStream(new FileInputStream(csvFile), 500);
        Connection conn = tbl.getJdbcConnection(); //there is tbl var with java.sql.Connection
        CallableStatement cstmt = conn.prepareCall(String.format("{call %s(?, ?, ?)}", "ORACLE_API_FOR_ETL_FRAMEWORK.INSERT_LOB"));
        cstmt.setString(1, "FILE_TO_LOB");
        cstmt.setString(2, "/tmp/test_table.csv");
        cstmt.setClob(3, bis, (int) csvFile.length());
        cstmt.execute();

        if (!conn.getAutoCommit()) {
            conn.commit();
        }

我是 Spark 新手..所以任何想法请如何将 JavaRDD 转换为 BufferedInputStream,或者摆脱上面的困惑并以更理智的方式将数据集放入 Oracle BLOB..

谢谢

最佳答案

最后..经过几天与 Oracle、Hadoop 和 Spark 的斗争,我找到了适合我的任务的解决方案:

        try {
        String trgtFolderPath = "tmp/ETLFramework/csv/form_name";
        Configuration conf = new Configuration();
        String hdfsUri = "hdfs://" + /*nameNode*/ + ":" + /*hdfsPort*/;
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsUri), conf);
        RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem.listFiles(new Path(trgtFolderPath), true);
        while(fileStatusListIterator.hasNext()){
            LocatedFileStatus fileStatus = fileStatusListIterator.next();
            String fileName = fileStatus.getPath().getName();
            if (fileName.contains(".csv") && fileStatus.getLen()>0) {
                log.info("fileName=" + fileName);
                log.info("fileStatus.getLen=" + fileStatus.getLen());
                BufferedInputStream bis = new BufferedInputStream(fileSystem.open(new Path(trgtFolderPath + "/" + fileName)), 500);

                ETLParams param = ETLParams.getParams();
                Connection conn = tbl.getJdbcConnection();
                String apiPackageInsertLOB = ETLService.replaceParams(tbl.getConnection().getFullSchema() + "." + tbl.getApiPackage().getDbTableApiPackageInsertLOB(), param.getParamsByName());
                log.info(String.format("Call %s(%s, %s, %s);", apiPackageInsertLOB, tbl.getFullTableName(), trgtFolderPath + "/" + fileName, "p_nInsertedRows"));
                CallableStatement cstmt = conn.prepareCall(String.format("{call %s(?, ?, ?, ?)}", apiPackageInsertLOB));
                cstmt.setString(1, tbl.getFullTableName());
                cstmt.setString(2, trgtFolderPath + "/" + fileName);
                cstmt.setBlob(3, bis, fileStatus.getLen());
                cstmt.registerOutParameter(4, Types.INTEGER);
                cstmt.execute();
                int rowsInsertedCount = cstmt.getInt(3);
                log.info("Inserted " + rowsInsertedCount + " rows into table blob_file");
                cstmt.close();
            }
        }
        fileSystem.close();
    }
    catch (IOException |
           SQLException exc){
        exc.printStackTrace();
    }

将 2 Gb CSV 从 Spark 数据集写入 HDFS,然后将此 CSV 从 HDFS 读取到 Oracle BLOB 大约需要 5 分钟。

关于java - 使用 Spark 从 HDFS 到 Oracle BLOB 的 CSV 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61877050/

相关文章:

java - Apache Spark 上的 CoreNLP

java.util.Calendar.get* 和 java.util.Calendar.set* 是阻塞的

sql - 为历史表的每个外键获取最后一个条目的正确方法是什么?

scala - 如何加载保存的 KMeans 模型(在 ML 管道中)?

java - WCF 与 Spark 集成

json - Spark对json的异常处理

java - 运行 EJB 应用程序

Java 为什么将 long (64) 转换为 float (32) 考虑扩大?

oracle - ClassNotFoundException使用Jetty/Gradle加载Oracle驱动程序

database - Docker/Oracle数据库/卷持久化/创建表空间