我想在spark中进行字数统计,我使用spark sql创建了一个rdd来从数据集中提取不同的推文。 我想在 RDD 之上使用 split 函数,但它不允许我这样做。
错误:- 值 split 不是 org.apache.spark.sql.SchemaRdd 的成员
Spark 代码无法进行字数统计:-
val disitnct_tweets=hiveCtx.sql("select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_List=sc.parallelize(List(distinct_tweets))
//tried split on both the rdd disnt worked
distinct_tweets.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
distinct_tweets_List.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
但是当我将 Sparksql 中的数据输出到文件并再次加载并运行 split 时,它就可以工作了。
有效的示例代码:-
val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val rdd=sc.parallelize(distinct_tweets_op)
rdd.saveAsTextFile("/home/cloudera/bdp/op")
val textFile=sc.textFile("/home/cloudera/bdp/op/part-00000")
val counts=textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.SaveAsTextFile("/home/cloudera/bdp/wordcount")
我需要一个答案,而不是写入文件并再次加载来执行我的分割函数是否有解决方法可以使分割函数工作
谢谢
最佳答案
首先,我们不应该先执行collect(),然后并行化创建RDD;这会让司机忙碌/沮丧。
相反,
val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.map(x => x.mkstring)
[考虑到这一点,您在查询中仅选择单个列 - distinct(text)
]
现在distinct_tweets_op只是一个RDD。
所以,循环这个 RDD;您可以在该 RDD 中的每个字符串上应用 split("") 函数。
关于java - Spark模式rdd到RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30412496/