hadoop - 尝试在Apache Spark中持久存储到数据库时RDD不起作用

标签 hadoop apache-spark apache-spark-sql rdd

我想将rdd保留到mysql数据库表中。我使用了一个map函数来遍历RDD,并将每个元组传递给我的函数,在其中进行持久化。在这里,我想将我的工作分为主节点和从节点。

但是它不能正常工作,也没有调用使数据库持久化的函数。

如果我使用courseSet.collect().map(m => sendCourseInfo(m))之类的collect()而不是courseSet.map(m => sendCourseInfo(m)),则可以正常工作。

我不想在这里使用collect()。

我在许多文章中都对此进行了搜索,但无法弄清楚。谁能帮我解决这个问题。

下面是我的代码,

 .....
  x.toString().split(",")(1),
  x.toString().split(",")(2),
  x.toString().split(",")(3)))

 courseSet.map(m => sendCourseInfo(m))
}

def sendCourseInfo(courseData: (Int, String, String, String)): Unit = {
    try {
      DatabaseUtil.setJDBCConfiguration()

      val jdbcConnection: java.sql.Connection = DatabaseUtil.getConnection

      val statement = "{call insert_course (?,?,?,?)}"
      val callableStatement = jdbcConnection.prepareCall(statement)
      callableStatement.setInt(1, courseData._1)
      callableStatement.setString(2, courseData._2)
      callableStatement.setString(3, courseData._3)
      callableStatement.setString(4, courseData._4)

      callableStatement.executeUpdate
    } catch {
      case e: SQLException => println(e.getStackTrace)
    }
}

最佳答案

您正在RDD上调用map(),这是一个转换而不是一个 Action 。因此,要执行该命令,您需要调用一些操作,例如,

courseSet.foreach(sendCourseInfo)

有关您正在做什么的其他建议,

不管x是什么,您都将其强制转换为String,将其拆分并从此拆分中提取一些内容。您需要对RDD / Collection中的每个元素进行三次。因此,您可以使用类似的方法对其进行优化,
x.map(_.toString.split(",")).map(x=>(x(1),x(2),x(3)))

接下来,您必须将此数据持久保存在数据库(在这种情况下为MySql)中。为此,您使用的是Java通常的jdbc连接,为每个元素创建一个新的连接和操作。相反,使用Spark 2.x做类似的事情,
import org.apache.spark.sql.SparkSession
import java.util.Properties

...

case class TableSchema(col1:Int,col2:String,col3:String,col4:String)
val props = new Properties()

def main(args: Array[String]): Unit = {

val ss = SparkSession.builder.appName("Test").master("local[*]").getOrCreate()

import ss.implicits._

...

props.setProperty("username", "username")
props.setProperty("password", "password")    

val df = rdd.map(_.toString.split(",")).map(x=>TableSchema(x(0),x(1),x(2),x(3))).toDF()

df.write.jdbc(s"jdbc:mysql://${mysqlHost}/${mysqlDBName}", "tablename", props)

}

让我知道这是否有帮助,干杯。

关于hadoop - 尝试在Apache Spark中持久存储到数据库时RDD不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45319588/

相关文章:

apache-spark - 自定义 Spark 聚合器返回行

hadoop - 无法在 Mahout.0.13 中添加类 : org. apache.mahout.classifier.df.mapreduce.BuildForest

apache-spark - 如何在Spark SQL中为表添加增量列ID

java - 在SparkSession.SQL中执行Prepared和Batch语句

Java Spark-如何以多列作为参数调用 UDF

python - 将文本文件读取到元组pyspark

apache-spark - Spark 1.4 : Spark SQL ANY and ALL functions

hadoop - 无法访问 YARN 作业历史记录

hadoop - 使用 Sqoop 导入数据,时间戳数据类型

Hadoop 命令