我正在学习 Spark(在 Scala 中)并且一直试图弄清楚如何计算文件每一行上的所有单词。
我正在处理一个数据集,其中每一行都包含一个制表符分隔的 document_id 和文档的全文
doc_1 <full-text>
doc_2 <full-text>
etc..
这是我在一个名为 doc.txt 的文件中的玩具示例
doc_1 new york city new york state
doc_2 rain rain go away
我想我需要做的是转换成包含的元组
((doc_id, word), 1)
然后调用 reduceByKey() 对 1 求和。我写了以下内容:
val file = sc.textFile("docs.txt")
val tuples = file.map(_.split("\t"))
.map( x => (x(1).split("\\s+")
.map(y => ((x(0), y), 1 )) ) )
这确实给了我我认为我需要的中间表示:
tuples.collect
res0: Array[Array[((String, String), Int)]] = Array(Array(((doc_1,new),1), ((doc_1,york),1), ((doc_1,city),1), ((doc_1,new),1), ((doc_1,york),1), ((doc_1,state),1)), Array(((doc_2,rain),1), ((doc_2,rain),1), ((doc_2,go),1), ((doc_2,away),1)))
但是如果在元组上调用 reduceByKey 它会产生一个错误
tuples.reduceByKey(_ + )
<console>:21: error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[Array[((String, String), Int)]]
tuples.reduceByKey(_ + )
我似乎无法理解如何做到这一点。我想我需要对数组内的数组进行缩减。我尝试了很多不同的东西,但像上面一样不断出错并且没有任何进展。
对此的任何指导/建议将不胜感激。
注意:我知道 https://spark.apache.org/examples.html 上有一个字数统计的例子展示如何获取文件中所有单词的计数。但那是针对整个输入文件的。我说的是获取每个文档的计数,其中每个文档都在不同的行上。
最佳答案
reduceByKey
预计类型 RDD[(K,V)]
而在您执行 split
的那一刻在第一 map
,你最终会得到一个 RDD[Array[...]]
,这不是所需的类型签名。您可以按如下方式重新设计当前的解决方案......但它可能不会那么高效(在使用 flatMap
重新工作的代码之后阅读):
//Dummy data load
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))
//Split the data on tabs to get an array of (key, line) tuples
val firstPass = file.map(_.split("\t"))
//Split the line inside each tuple so you now have an array of (key, Array(...))
//Where the inner array is full of (word, 1) tuples
val secondPass = firstPass.map(x=>(x(0), x(1).split("\\s+").map(y=>(y,1))))
//Now group the words and re-map so that the inner tuple is the wordcount
val finalPass = secondPass.map(x=>(x._1, x._2.groupBy(_._1).map(y=>(y._1,y._2.size))))
可能是更好的解决方案 vvvv :
如果要保留当前结构,则需要更改为使用
Tuple2
从一开始然后使用 flatMap
后://Load your data
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))
//Turn the data into a key-value RDD (I suggest caching the split, kept 1 line for SO)
val firstPass = file.map(x=>(x.split("\t")(0), x.split("\t")(1)))
//Change your key to be a Tuple2[String,String] and the value is the count
val tuples = firstPass.flatMap(x=>x._2.split("\\s+").map(y=>((x._1, y), 1)))
关于scala - Spark 中的每文档字数统计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28270635/