scala - Spark 中的每文档字数统计

标签 scala apache-spark

我正在学习 Spark(在 Scala 中)并且一直试图弄清楚如何计算文件每一行上的所有单词。
我正在处理一个数据集,其中每一行都包含一个制表符分隔的 document_id 和文档的全文

doc_1   <full-text>
doc_2   <full-text>
etc..

这是我在一个名为 doc.txt 的文件中的玩具示例
doc_1   new york city new york state
doc_2   rain rain go away

我想我需要做的是转换成包含的元组
((doc_id, word), 1)

然后调用 reduceByKey() 对 1 求和。我写了以下内容:
val file = sc.textFile("docs.txt")
val tuples = file.map(_.split("\t"))
            .map( x => (x(1).split("\\s+")
            .map(y => ((x(0), y), 1 ))   ) )

这确实给了我我认为我需要的中间表示:
tuples.collect

res0: Array[Array[((String, String), Int)]] = Array(Array(((doc_1,new),1), ((doc_1,york),1), ((doc_1,city),1), ((doc_1,new),1), ((doc_1,york),1), ((doc_1,state),1)), Array(((doc_2,rain),1), ((doc_2,rain),1), ((doc_2,go),1), ((doc_2,away),1)))

但是如果在元组上调用 reduceByKey 它会产生一个错误
tuples.reduceByKey(_ + )
<console>:21: error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[Array[((String, String), Int)]]
              tuples.reduceByKey(_ + )

我似乎无法理解如何做到这一点。我想我需要对数组内的数组进行缩减。我尝试了很多不同的东西,但像上面一样不断出错并且没有任何进展。
对此的任何指导/建议将不胜感激。

注意:我知道 https://spark.apache.org/examples.html 上有一个字数统计的例子展示如何获取文件中所有单词的计数。但那是针对整个输入文件的。我说的是获取每个文档的计数,其中每个文档都在不同的行上。

最佳答案

reduceByKey预计类型 RDD[(K,V)]而在您执行 split 的那一刻在第一 map ,你最终会得到一个 RDD[Array[...]] ,这不是所需的类型签名。您可以按如下方式重新设计当前的解决方案......但它可能不会那么高效(在使用 flatMap 重新工作的代码之后阅读):

//Dummy data load
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))  

//Split the data on tabs to get an array of (key, line) tuples
val firstPass = file.map(_.split("\t"))

//Split the line inside each tuple so you now have an array of (key, Array(...)) 
//Where the inner array is full of (word, 1) tuples
val secondPass = firstPass.map(x=>(x(0), x(1).split("\\s+").map(y=>(y,1)))) 

//Now group the words and re-map so that the inner tuple is the wordcount
val finalPass = secondPass.map(x=>(x._1, x._2.groupBy(_._1).map(y=>(y._1,y._2.size))))

可能是更好的解决方案 vvvv :

如果要保留当前结构,则需要更改为使用 Tuple2从一开始然后使用 flatMap后:
//Load your data
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))
//Turn the data into a key-value RDD (I suggest caching the split, kept 1 line for SO)
val firstPass = file.map(x=>(x.split("\t")(0), x.split("\t")(1)))
//Change your key to be a Tuple2[String,String] and the value is the count
val tuples = firstPass.flatMap(x=>x._2.split("\\s+").map(y=>((x._1, y), 1)))

关于scala - Spark 中的每文档字数统计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28270635/

相关文章:

scala - 在Spark中使用复杂过滤从Elasticsearch获取esJsonRDD

python - Spark异常: Only one SparkContext may be running in this JVM (see SPARK-2243)

python - 为独立应用程序导入 pyspark

scala - 在一对多中, "where exists"如何在slick中表达?

eclipse - 如何将 sbteclipse 插件添加到 SBT 0.10.x

postgresql - spark 结构化流 PostgreSQL updatestatebykey

scala - 如何在智能IDEA中为Scala自定义代码折叠?

java - 更新错误 :org. elasticsearch.ElasticsearchIllegalArgumentException:执行脚本失败

java - IntelliJ 提示我使用的是 java 5 但实际上它是 Java 12

amazon-web-services - 使用 Pycharm 在 EMR 上调试 Pyspark