scala - Spark : How to use mapPartition and create/close connection per partition

标签 scala apache-spark rdd

因此,我想在我的 Spark DataFrame 上执行某些操作,将它们写入数据库并在最后创建另一个 DataFrame。它看起来像这样:

import sqlContext.implicits._

val newDF = myDF.mapPartitions(
  iterator => {
    val conn = new DbConnection
    iterator.map(
       row => {
         addRowToBatch(row)
         convertRowToObject(row)
     })
    conn.writeTheBatchToDB()
    conn.close()
  })
  .toDF()

这给了我一个错误,因为mapPartitions期望返回类型为Iterator[NotInferedR],但这里是Unit。我知道这对于 forEachPartition 是可能的,但我也想进行映射。单独进行会产生开销(额外的 Spark 工作)。该怎么办?

谢谢!

最佳答案

在大多数情况下,如果不减慢作业速度,急切地使用迭代器将导致执行失败。因此,我所做的是检查迭代器是否已为空,然后执行清理例程。

rdd.mapPartitions(itr => {
    val conn = new DbConnection
    itr.map(data => {
       val yourActualResult = // do something with your data and conn here
       if(itr.isEmpty) conn.close // close the connection
       yourActualResult
    })
})

一开始我认为这是一个 Spark 问题,但实际上是一个 scala 问题。 http://www.scala-lang.org/api/2.12.0/scala/collection/Iterator.html#isEmpty:Boolean

关于scala - Spark : How to use mapPartition and create/close connection per partition,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36545579/

相关文章:

scala - Scala 中的全局变量

scala - Spark 将 sql 窗口函数迁移到 RDD 以获得更好的性能

scala-library 导入了两次?

Scala - 调用具有泛型类型参数的方法,给定确定正确类型的字符串值

apache-spark - Apache Spark : Yarn logs Analysis

scala - 在 scala 中,我们如何聚合数组来确定每个键的计数以及百分比与总数

apache-spark - Spark RDD 操作类似于 top 返回一个较小的 RDD

apache-spark - 如何使用Spark将输出作为单独的文件写入现有HDFS目录下?

python - 将pyspark偏移滞后动态值检索到其他数据帧

csv - 将唯一的连续行号添加到 pyspark 中的数据框