scala - 如何访问InputDStream数据?

标签 scala apache-spark bigdata apache-kafka

我对 scala 和 Spark 还很陌生。我正在从 kafka(真实)发送字符串消息到 Spark(本地),但如何访问它们?例如,我想要一个包含我所有消息的字符串列表。我最终只打印它们:

val sc = new SparkContext(conf)
val ssc =  new StreamingContext(sc, Seconds(1))
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder ](ssc, kafkaParams, Set[String]("testTopic"))
directKafkaStream.print() //I can see it in console, but how to get my message string? 

最佳答案

正如您在 API 中看到的那样,InputDStream 是 DStream 的子类。这意味着您可以访问 InputDStream,就像它只是一个 DStream 一样。

您可以通过迭代 RDD 来“获取”消息字符串,例如:

directKafkaStream.foreachRDD { rdd => 
  rdd.foreach { content => 
    // code to handle the string here
  }
}

一般来说,这取决于您期望的类型(例如,自定义 Avro 记录),但在您处理字符串的情况下,将 rdd 视为 string[s] 的集合就足够了。

理论上,如果您想应用转换(例如,filter,...),您甚至不需要使用单独的 .foreachRDD 迭代 RDD。例如,如果要过滤包含特定单词的所有字符串,可以使用:

val infoLines = directKafkaStream.filter { line =>
  line.contains("INFO")
}

请注意,infoLines 仍将返回 DStream[String],因此您仍然会遇到相同的初始问题:如何访问单个字符串? 您需要了解 DStreams 和 RDD 是 Spark 和 Spark Streaming 使用的高级数据抽象 - 通常您将首先对那些具有转换的操作进行操作,然后应用操作(例如,saveAsTextFile ) - 很少有 println 语句。

但是,从你的问题来看,你似乎需要阅读一些关于Spark Streaming的文档:official documentation是一个很好的资源(尽管有些书籍,例如 Learning Spark,也可能有助于您理解)。

关于scala - 如何访问InputDStream数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34067920/

相关文章:

scala - Scala 中模拟函数无​​法正确返回值类

dataframe - Spark Dataframe.cache() 更改源的行为

sql - Spark DataFrame中=== null和isNull之间的区别

mapreduce - Apache Spark 中的驱动程序何时不会运行某个操作?

hadoop - 我如何使用 apache mahout 实现 LDA?

scala - 动态字符串插值

scala - 将时间戳解析为 LocalDateTime Scala

apache-spark - 同时多个 Spark 应用程序,同一个 Jarfile... 作业处于等待状态

r - 将索引矩阵与其在 R 中的相应值交换的快速方法

scala - Akka Http HTTPS 配置导致 Chrome 返回 ERR_SSL_VERSION_OR_CIPHER_MISMATCH