我正在 Spark 中创建一个独立的应用程序,我需要在其中读取包含推文的文本文件。每次提及都以符号“@”开头。目标是浏览此文件并找到最多 20 次提及的内容。所有提及中都应去掉标点符号,如果推文多次提及相同内容,则应仅计算一次。一条推文中可以有多个唯一提及。文件中有很多推文。
我是 scala 和 apache-spark 的新手。我正在考虑使用过滤功能并将结果放入列表中。然后将列表转换为项目唯一的集合。但语法、正则表达式和读取文件是我面临的问题。
def main(args: Array[String]){
val locationTweetFile = args(0)
val spark = SparkSession.builder.appName("does this matter?").getOrCreate()
推文文件很大,下面的命令安全吗?
val tweetsFile = spark.read.textFile(locationTweetFile).cache()
val mentionsExp = """([@])+""".r
}
如果推文说 “嘿@Honda,我是@customer,我爱@honda。我最喜欢@CUSTOMER。” 那么输出应该类似于 ((honda, 1),(customer,1))
由于有多条推文,另一条推文可以说: “@HoNdA,我也是@cuSTomER @STACKEXCHANGE。” 那么最终的输出将类似于 ((本田,2),(客户,2),(stackexchange,1))
最佳答案
让我们一步一步来。
1) appName("这重要吗?")
在你的情况下并不重要
2) spark.read.textFile(filename)
由于其惰性而安全,文件不会加载到内存中
现在,关于实现:
Spark 涉及数据转换,因此您需要考虑如何将原始推文转换为每条推文中唯一提及的列表。接下来,您将提及列表转换为 Map[Mention, Int]
,其中 Int
是 RDD 中该提及的总数。
转换通常通过 map(f: A => B)
方法完成,其中 f
是将 A
值映射到 B
的函数。
def tweetToMentions(tweet: String): Seq[String] =
tweet.split(" ").collect {
case s if s.startsWith("@") => s.replaceAll("[,.;!?]", "").toLowerCase
}.distinct.Seq
val mentions = tweetToMentions("Hey @Honda, I am @customer I love @honda. I am favorite @CUSTOMER.")
// mentions: Seq("@honda", "@customer")
下一步是将此函数应用于 RDD 中的每个元素:
val mentions = tweetsFile.flatMap(tweetToMentions)
请注意,我们使用 flatMap
而不是 map
,因为 tweetToMentions
返回 Seq[String]
并且我们希望我们的 RDD 仅包含提及项,flatMap
会将结果展平。
为了计算 RDD 中每个提及的出现次数,我们需要应用一些魔法:
首先,我们将提及内容映射
到成对的(Mention, 1)
mentions.map(mention => (mention, 1))
然后我们使用reduceByKey
来计算每个提及在我们的RDD中出现的次数。最后,我们按提及次数排序并检索结果。
val result = mentions
.map(mention => (mention, 1))
.reduceByKey((a, b) => a + b)
.takeOrdered(20)(Ordering[Int].reverse.on(_.2))
关于linux - 如何在 Spark 中编写一个独立应用程序,以在充满提取推文的文本文件中查找 20 个最常提及的内容,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55522865/