scala - 如何解决此错误值toDS不是org.apache.spark.rdd.RDD的成员?

标签 scala apache-spark elasticsearch

我编写了这段代码,将来自Twitter的流数据发送到elasticsearch,我添加了所有必要的依赖项,但是toDS和toES这两个函数存在问题,请帮助我解决此问题
这是我的代码:
`包org.lansrod.visualisation

 import org.apache.spark.SparkConf
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.streaming.twitter.TwitterUtils
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import twitter4j.auth.OAuthAuthorization
 import twitter4j.conf.ConfigurationBuilder
 import org.apache.spark.sql.{Row, SparkSession}

 object twitter {

 def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("twitter")
val ssc = new StreamingContext(conf, Seconds(5)) // spark streaming context

val ACCESS_TOKEN = "my access token"
val ACCESS_SECRET = "my access secret"
val CONSUMER_KEY = "my consumer key"
val CONSUMER_SECRET = "my consumer secret"
val cb = new ConfigurationBuilder
cb.setDebugEnabled(true).setOAuthConsumerKey(CONSUMER_KEY)
  .setOAuthConsumerSecret(CONSUMER_SECRET)
  .setOAuthAccessToken(ACCESS_TOKEN)
  .setOAuthAccessTokenSecret(ACCESS_SECRET)

val auth = new OAuthAuthorization(cb.build) //avoir l'authorisation
val tweets = TwitterUtils.createStream(ssc, Some(auth))

ssc.start()
ssc.awaitTermination()

tweets.foreachRDD  { rdd =>
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._
  val caseClassDS = rdd.toDS()
  caseClassDS.saveToEs("spark/docs")
  
}
}
}
`
和我的build.sbt是以下内容:
scalaVersion := "2.11.0"
val sparkVersion = "2.4.6"
`libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided", //provided
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.elasticsearch" %% "elasticsearch-spark-20" % "7.6.1" ,

"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" % "spark-streaming-twitter_2.11" % "1.6.1" exclude("org.twitter4j", 
"twitter4j"),
"org.twitter4j" % "twitter4j-core" % "2.2.0",
"org.twitter4j" % "twitter4j-stream" % "2.2.0",
"org.apache.spark`enter code here`" %% "spark-mllib" % sparkVersion)

最佳答案

Spark版本<2.x
toDS可与sqlContext.implicits._一起使用。

 import sqlContext.implicits._
 val myrdd = testRDD.toDS() 
Spark版本> = 2.x
val spark: SparkSession = SparkSession.builder.config(conf).getOrCreate; 
import spark.implicits._ 
val myrdd = testRDD.toDS()

关于scala - 如何解决此错误值toDS不是org.apache.spark.rdd.RDD的成员?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63291003/

相关文章:

scala - 案例类和案例对象之间的区别?

java - 如何在scala中使用flatMap来对一组 "vals"进行分组

python - 在AWS EMR上``搜索时找不到有效的SPARK_HOME''

python - PySpark 在映射 lambda 中序列化 'self' 引用对象?

scala - 如何仅将 "cube"用于 Spark 数据帧上的特定字段?

elasticsearch - 将ElasticSearch从2升级到7.6

scala - 如何在scala中将资源文件读取到字节数组?

elasticsearch - 如何从Logstash递增文档的计数字段?

python - 为什么根据查询方式的不同,elasticsearch 报告的命中数会不同?

java - 修改后的 DeepLearning4Java 出现内存泄漏或拥塞工作线程(使用 akka)