python - Spark/Python,reduceByKey() 然后找到前 10 个最常见的单词和频率

标签 python apache-spark pyspark tuples rdd

我有一个使用 Hadoop + Spark 的 VirtualMachine 设置,我正在从我的 HDFS 中读取一个文本文件“words.txt”,然后调用 map()、flatmap()、reduceByKey() 并尝试获取前 10 个最常见的单词及其出现。我已经完成了大部分代码,然后聚合了元组列表,但我只需要一种方法来找到前 10 个。我知道我需要简单地遍历元组中的值(键是实际的 str 词,但值是该词在 words.txt 文件中出现的次数的整数)并且只需要一个计数器来计算顶部10. (K,V) 值对是 Key = words.txt 中的单词,Value = 文件中出现次数的整数聚合值。下面这张截图是在调用 reduceByKey() 之后,你可以看到 'the' 出现了 40 次(右边是屏幕截图的结尾)

这是输出: enter image description here

到目前为止,这是我的代码:

from pyspark import SparkcConf, SparkContext

# Spark set-up
conf = SparkConf()
conf.setAppName("Word count App")
sc = SparkContext(conf=conf)

# read from text file words.txt on HDFS
rdd = sc.textFile("/user/spark/words.txt")

# flatMap() to output multiple elements for each input value, split on space and make each word lowercase
rdd = rdd.flatMap(lamda x: x.lower().split(' '))

# Map a tuple and append int 1 for each word in words.txt
rdd = rdd.map(lamda x: (x,1))

# Perform aggregation (sum) all the int values for each unique key)
rdd = rdd.reduceByKey(lamda x, y: x+y)

# This is where I need a function or lambda to sort by descending order so I can grab the top 10 outputs, then print them out below with for loop

# for item in out:
print(item[0], '\t:\t', str(item[1]))

我知道我通常只会创建一个名为“max”的变量,并且只有在列表或元组中找到最大值时才更新它,但让我感到困惑的是我正在处理 Spark 和 RDD,所以我一直在错误,因为我对 RDD 在执行 map、flatmap、reduceByKey 等操作时返回的内容有些困惑...

非常感谢任何帮助

最佳答案

你可以在 reduce 之后反转 K,V 这样你就可以使用 sortByKey 函数:

rdd.map(lambda (k,v): (v,k)).sortByKey(False).take(10)

对于 Python 3:(因为不再支持在 lambda 表达式中解包元组)

rdd.map(lambda x: (x[1], x[0])).sortByKey(False).take(10)

关于python - Spark/Python,reduceByKey() 然后找到前 10 个最常见的单词和频率,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59240504/

相关文章:

python - 从字符串中删除字母

python - 什么是 pandas 数据框标签?

python - scipy 稀疏矩阵作为 petsc4py 的输入

apache-spark - Spark命令中的--driver-class-path有什么用?

python - 如何从文件中读取数据并将其传递给 Spark/PySpark 中的 FPGrowth 算法

data-structures - Python:修改列表元素的正确方法是什么?

scala - 仅当在 main 方法之外定义 case 类以创建 Dataset[case class] 或 Dataframe[case class] 时才工作

python - 在python中每次迭代时分别获取函数的日志

python - 获取 Spark RDD 中每个键的最大值

azure - Spark作业最后无法删除其临时文件夹