java - Spark模式rdd到RDD

标签 java scala apache-spark apache-spark-sql pyspark

我想在spark中进行字数统计,我使用spark sql创建了一个rdd来从数据集中提取不同的推文。 我想在 RDD 之上使用 split 函数,但它不允许我这样做。

错误:- 值 split 不是 org.apache.spark.sql.SchemaRdd 的成员

Spark 代码无法进行字数统计:-

val disitnct_tweets=hiveCtx.sql("select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_List=sc.parallelize(List(distinct_tweets))

//tried split on both the rdd disnt worked

distinct_tweets.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

distinct_tweets_List.flatmap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

但是当我将 Sparksql 中的数据输出到文件并再次加载并运行 split 时,它就可以工作了。

有效的示例代码:-

val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val rdd=sc.parallelize(distinct_tweets_op)
rdd.saveAsTextFile("/home/cloudera/bdp/op")
val textFile=sc.textFile("/home/cloudera/bdp/op/part-00000")
val counts=textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
counts.SaveAsTextFile("/home/cloudera/bdp/wordcount")

我需要一个答案,而不是写入文件并再次加载来执行我的分割函数是否有解决方法可以使分割函数工作

谢谢

最佳答案

首先,我们不应该先执行collect(),然后并行化创建RDD;这会让司机忙碌/沮丧。

相反,

val distinct_tweets=hiveCtx.sql("select dsitinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.map(x => x.mkstring)    

[考虑到这一点,您在查询中仅选择单个列 - distinct(text)]

现在distinct_tweets_op只是一个RDD。

所以,循环这个 RDD;您可以在该 RDD 中的每个字符串上应用 split("") 函数。

关于java - Spark模式rdd到RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30412496/

相关文章:

java - 查找字符串中的字符在java中重复了多少次

java - 当设备在 Android 中水平放置时,为什么 ListView 内的嵌套自定义 View 会阻止正确聚焦?

python - Pyspark - 计算两个数据帧之间的日期

c# - 如何从 C# 中引用带有美元符号的标识符?

java - 如何使用Spark java从mariadb读取数据

java - 使用Java读取Spark中包含结构类型的CSV文件

java - 如何通过单击android中的按钮来执行两个操作

java - 按字母顺序对 ArrayList<String[]> 进行排序

scala - 数组 ("one", "two").mkString (":") 在方法返回 WrappedArray 中定义,而不是字符串

xml - 使用 Scala 将 XML 转换为系统属性