scala - 循环遍历 Map Spark Scala

标签 scala csv apache-spark twitter dataset

在此代码中,我们有两个文件:包含姓名的 Athens.csv 和包含推文消息的 twitter.test。我们希望找到 twitter.test 中与运动员.csv 中的名称相匹配的每一行的名称。我们应用映射函数来存储运动员.csv 中的名称,并希望将所有名称迭代到测试中的所有行文件。

object twitterAthlete {

  def loadAthleteNames() : Map[String, String] = {

    // Handle character encoding issues:
    implicit val codec = Codec("UTF-8")
    codec.onMalformedInput(CodingErrorAction.REPLACE)
    codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

    // Create a Map of Ints to Strings, and populate it from u.item.
    var athleteInfo:Map[String, String] = Map()
    //var movieNames:Map[Int, String] = Map() 
     val lines = Source.fromFile("../athletes.csv").getLines()
     for (line <- lines) {
       var fields = line.split(',')
       if (fields.length > 1) {
        athleteInfo += (fields(1) -> fields(7))
       }
     }

     return athleteInfo
  }

  def parseLine(line:String): (String)= {
    var athleteInfo = loadAthleteNames()
    var hello = new String
    for((k,v) <- athleteInfo){
      if(line.toString().contains(k)){
        hello = k
      }
    }
    return (hello)
  }


  def main(args: Array[String]){
    Logger.getLogger("org").setLevel(Level.ERROR)

    val sc = new SparkContext("local[*]", "twitterAthlete")

    val lines = sc.textFile("../twitter.test")
    var athleteInfo = loadAthleteNames()

    val splitting = lines.map(x => x.split(";")).map(x => if(x.length == 4 && x(2).length <= 140)x(2)) 

    var hello = new String()
    val container = splitting.map(x => for((key,value) <- athleteInfo)if(x.toString().contains(key)){key}).cache


    container.collect().foreach(println)  

   // val mapping = container.map(x => (x,1)).reduceByKey(_+_)
    //mapping.collect().foreach(println)
  }
}

第一个文件如下所示:

id,name,nationality,sex,height........  
001,Michael,USA,male,1.96 ...
002,Json,GBR,male,1.76 ....
003,Martin,female,1.73 . ...

第二个文件如下所示:

time, id , tweet .....
12:00, 03043, some message that contain some athletes names  , .....
02:00, 03023, some message that contain some athletes names , .....

有些人是这样想的......

但是运行此代码后我得到了空结果,非常感谢任何建议

我得到的结果是空的:

()....
()...
()...

但我期望的结果是这样的:

(name,1)
(other name,1)

最佳答案

您需要使用yield将值返回到您的map

 val container = splitting.map(x => for((key,value) <- athleteInfo ; if(x.toString().contains(key)) ) yield (key, 1)).cache

关于scala - 循环遍历 Map Spark Scala,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47926525/

相关文章:

Scala 测试所有实现的最终配置?

php - 如何检测上传的csv文件的编码

apache-spark - 使用 AWS Glue 作业在 Redshift 中导入数据时添加时间戳列

java - 如何在数据框中动态地从列表中选择列加上固定列

csv - JMeter - 根据属性读取 CSV

apache-spark - 如何在 Spark-Shell 中运行时添加 Hive 属性

scala - 猫缺少 Intersperse?

scala - scala 中的最小最大标准化

xml - 如何安全地处理 Scala 中的 unicode 用户输入(尤其是 XML 实体)

python - pd.read_csv 默认情况下将整数视为 float