scala - 如何将 Iterable 转换为 RDD

标签 scala hadoop apache-spark aggregate-functions rdd

更具体地说,我如何将 scala.Iterable 转换为 org.apache.spark.rdd.RDD

我有一个 (String, Iterable[(String, Integer)]) 的 RDD 我希望将其转换为 (String, RDD[String, Integer])RDD,以便我可以将 reduceByKey 函数应用于内部 RDD.

例如 我有一个 RDD,其中键是人名的 2 个字母前缀,值是人名和他们在事件中花费的时间对的列表

我的 RDD 是:

("To", List(("Tom",50),("Tod","30"),("Tom",70),("Tod","25"),("Tod",15) ) ("Ja", List(("Jack",50),("James","30"),("Jane",70),("James","25"),("Jasper",15) )

我需要将列表转换为 RDD,以便我可以使用累积每个人花费的总小时数。应用 reduceByKey 并使结果为 ("To", RDD(("Tom",120),("Tod","70")) ("Ja", RDD(("Jack",120),("James","55"),("Jane",15))

但是我找不到任何这样的转换函数。我怎样才能做到这一点 ?

提前致谢。

最佳答案

您可以使用 flatMapreduceByKey 来实现这一点。像这样:

rdd.flatMap{case(key, list) => list.map(item => ((key,item._1), item._2))}
   .reduceByKey(_+_)
   .map{case((key,name),hours) => (key, List((name, hours)))}
   .reduceByKey(_++_)

关于scala - 如何将 Iterable 转换为 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37208871/

相关文章:

hadoop - 通过oozie从sqoop作业中进行增量导入不会更新sqoop的元存储中的增量.last.value

hadoop - 如何开始使用 Hadoop

apache-spark - 为在同一台机器上运行的多个执行程序导出 spark 执行程序 jmx 指标

scala - 通过Spark Dataframe中数组结构中的最后一项删除重复的数组结构

java - 如何在 Java/Scala 中中断提交给 newSingleThreadExecutor 的线程?

android - android 上的远程 akka Actor ?

scala - 有人使用securesocial实现了死锁或任何其他授权机制吗?

python - 监控节点集群

apache-spark - Pyspark 如何从 word2vec 词嵌入计算 Doc2Vec?

scala - uPickle 和 ScalaJS : sealed trait serialisation