scala - Spark Scala 从 rdd.foreachPartition 获取数据

标签 scala apache-spark spark-streaming scalikejdbc

我有一些这样的代码:

      println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
      val lastRevs = distinctFileGidsRDD.
        foreachPartition(iter => {
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          while(iter.hasNext) {
            val item = iter.next()
            //println(item(0))
            println("String: "+item(0).toString())
            val jsonStr = DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
                map { resultSet => resultSet.string(1) }.single.apply()
            }
            println("\nJSON: "+jsonStr)
          }
        })
      println("\nEND Last Revs Class: "+ lastRevs.getClass)

代码输出(经过大量编辑)如下:
BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
...
JSON: None()
END Last Revs Class: void

问题 1:
如何使 lastRevs 值采用有用的格式,例如 JSON 字符串/null 或 Some/None 之类的选项?

问题2:
我的偏好:是否有另一种方法可以获取类似 RDD 格式(而不是迭代器格式)的分区数据?
dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  } 
}

来自 http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

问题 3:获取数据的方法是否我正在使用理智的方法(假设我正在关注上面的链接)? (撇开现在这是一个 scalikejdbc 系统 JDBC 的事实不谈。这将是一个键,这个原型(prototype)以外的某种类型的值存储。)

最佳答案

要创建使用执行器本地资源(例如数据库或网络连接)的转换,您应该使用 rdd.mapPartitions .它允许在本地初始化一些代码到执行器,并使用这些本地资源来处理分区中的数据。

代码应如下所示:

 val lastRevs = distinctFileGidsRDD.
        mapPartitions{iter => 
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          iter.map{ element => 
            DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar"
              .map { resultSet => resultSet.string(1) }.single.apply()
            }
          }
        }

关于scala - Spark Scala 从 rdd.foreachPartition 获取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36951207/

相关文章:

apache-spark - Yarn Heap 使用量随时间增长

multithreading - 运行 Scala 线程

java - 我可以在 Java 代码中使用 Scala 标准库吗?

scala - 在 Slurm 上运行 Spark

python - 如何将外部 python 库添加到 HDFS?

java - Spark Streaming : Using PairRDD. saveAsNewHadoopDataset函数保存数据到HBase

scala - akka actor适用于搜索等在线服务系统吗?

scala - Spark UDAF - 使用泛型作为输入类型?

scala - 自动 Spark RDD 分区缓存驱逐何时实现?

java - Spark Streaming Kafka - 当 RDD 包含实际消息时作业总是退出