linux - 如何在 Spark 中编写一个独立应用程序,以在充满提取推文的文本文件中查找 20 个最常提及的内容

标签 linux scala apache-spark

我正在 Spark 中创建一个独立的应用程序,我需要在其中读取包含推文的文本文件。每次提及都以符号“@”开头。目标是浏览此文件并找到最多 20 次提及的内容。所有提及中都应去掉标点符号,如果推文多次提及相同内容,则应仅计算一次。一条推文中可以有多个唯一提及。文件中有很多推文。

我是 scala 和 apache-spark 的新手。我正在考虑使用过滤功能并将结果放入列表中。然后将列表转换为项目唯一的集合。但语法、正则表达式和读取文件是我面临的问题。

def main(args: Array[String]){
   val locationTweetFile = args(0)
   val spark = SparkSession.builder.appName("does this matter?").getOrCreate()

推文文件很大,下面的命令安全吗?

val tweetsFile = spark.read.textFile(locationTweetFile).cache()
val mentionsExp = """([@])+""".r

}

如果推文说 “嘿@Honda,我是@customer,我爱@honda。我最喜欢@CUSTOMER。” 那么输出应该类似于 ((honda, 1),(customer,1))

由于有多条推文,另一条推文可以说: “@HoNdA,我也是@cuSTomER @STACKEXCHANGE。” 那么最终的输出将类似于 ((本田,2),(客户,2),(stackexchange,1))

最佳答案

让我们一步一步来。

1) appName("这重要吗?") 在你的情况下并不重要

2) spark.read.textFile(filename) 由于其惰性而安全,文件不会加载到内存中

现在,关于实现:

Spark 涉及数据转换,因此您需要考虑如何将原始推文转换为每条推文中唯一提及的列表。接下来,您将提及列表转换为 Map[Mention, Int],其中 Int 是 RDD 中该提及的总数。

转换通常通过 map(f: A => B) 方法完成,其中 f 是将 A 值映射到 B 的函数。

def tweetToMentions(tweet: String): Seq[String] =
  tweet.split(" ").collect {
    case s if s.startsWith("@") => s.replaceAll("[,.;!?]", "").toLowerCase
  }.distinct.Seq

val mentions = tweetToMentions("Hey @Honda, I am @customer I love @honda. I am favorite @CUSTOMER.")
// mentions: Seq("@honda", "@customer")

下一步是将此函数应用于 RDD 中的每个元素:

val mentions = tweetsFile.flatMap(tweetToMentions)

请注意,我们使用 flatMap 而不是 map,因为 tweetToMentions 返回 Seq[String] 并且我们希望我们的 RDD 仅包含提及项,flatMap 会将结果展平。

为了计算 RDD 中每个提及的出现次数,我们需要应用一些魔法:

首先,我们将提及内容映射到成对的(Mention, 1)

mentions.map(mention => (mention, 1))

然后我们使用reduceByKey来计算每个提及在我们的RDD中出现的次数。最后,我们按提及次数排序并检索结果。

val result = mentions
  .map(mention => (mention, 1))
  .reduceByKey((a, b) => a + b)
  .takeOrdered(20)(Ordering[Int].reverse.on(_.2))

关于linux - 如何在 Spark 中编写一个独立应用程序,以在充满提取推文的文本文件中查找 20 个最常提及的内容,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55522865/

相关文章:

scala - 如何声明必须返回其参数之一的函数的签名? (任何语言*)

c - 在C中的while循环内部和外部返回一个值

linux 在列内排序

scala - scalaz 中的笛卡尔积遍历

scala - Scala 中如何确定类型参数的子类型?

apache-spark - 由于任务积压而请求执行者

java - Spark Java语言加载数据集的最佳方式

hadoop - 如何在Spark而不是RAM的磁盘上进行计算?

linux - 将时间跨度(以秒为单位)转换为 shell 中的格式化时间

Linux:随机自动截图