scala - 用于激发 Dataframe 的结果集

标签 scala apache-spark spark-dataframe resultset

我正在查询mysql表

val url = "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/pg_partner"
val driver = "com.mysql.jdbc.Driver"
val username = "XXX"
val password = "XXX"
var connection:Connection = DriverManager.getConnection(url, username, password)
val statement = connection.createStatement()
val patnerName = statement.executeQuery("SELECT id,name FROM partner")

我确实在 patnerName 中得到了我的结果,但我需要转换为 Dataframe。

我可以通过下面的代码打印数据:

while (patnerName.next) {
  val id = patnerName.getString("id")
  val name = patnerName.getString("name")
  println("id = %s, name = %s".format(id,name))
}

现在如何将 patnerName 转换为 DataFrame?

最佳答案

因此您必须分几步完成:

  1. 定义列并准备架构
    val columns = Seq("id", "name")
    val schema = StructType(List(
      StructField("id", StringType, nullable = true),
      StructField("name", StringType, nullable = true)
    ))
  1. 定义在每次迭代时如何将 ResultSet 中的每条记录转换为一行
    def parseResultSet(rs: ResultSet): Row = {
      val resultSetRecord = columns.map(c => rs.getString(c))
      Row(resultSetRecord:_*)
    }
  1. 定义一个函数,将您的 ResultSet 转换为 Iterator[Row]。它将使用您在上一步中定义的函数(当您在下一步中调用它时)。
    def resultSetToIter(rs: ResultSet)(f: ResultSet => Row): Iterator[Row] =
      new Iterator[Row] {
        def hasNext: Boolean = rs.next()
        def next(): Row = f(rs)
      }
  1. 定义一个函数,从 Iterator[Row].toSeq 中创建一个 RDD,它使用您在上一步中定义的函数。使用模式从 RDD 创建 DataFrame
    def parallelizeResultSet(rs: ResultSet, spark: SparkSession): DataFrame = {
      val rdd = spark.sparkContext.parallelize(resultSetToIter(rs)(parseResultSet).toSeq)
      spark.createDataFrame(rdd, schema) // use the schema you defined in step 1
    }
  1. 最后调用你的函数
    val df: DataFrame = parallelizeResultSet(patner, spark)

关于scala - 用于激发 Dataframe 的结果集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41117750/

相关文章:

scala - 创建简单项目 SBT 0.10.X

android - 如何将 Scala 集成到核心 Android 平台中?

scala - 在非尾递归函数中使用 Scala 内部函数时是否会降低效率?

scala - Scala 中非 boolean 类型的逻辑运算符

java - Spark SQL - 选择所有和计算列?

hadoop - 提交 Spark 的工作绩效

apache-spark - 将数据帧写入 Spark 集群上的文件的速度非常慢

scala - 星火笔记本 : How can I filter rows based on a column value where each column cell is an array of strings?

python - pyspark:返回不完整的 URI 错误

python - pyspark:合并(外连接)两个数据框