我对 scala 和 Spark 还很陌生。我正在从 kafka(真实)发送字符串消息到 Spark(本地),但如何访问它们?例如,我想要一个包含我所有消息的字符串列表。我最终只打印它们:
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder ](ssc, kafkaParams, Set[String]("testTopic"))
directKafkaStream.print() //I can see it in console, but how to get my message string?
最佳答案
正如您在 API 中看到的那样,InputDStream 是 DStream 的子类。这意味着您可以访问 InputDStream,就像它只是一个 DStream 一样。
您可以通过迭代 RDD 来“获取”消息字符串,例如:
directKafkaStream.foreachRDD { rdd =>
rdd.foreach { content =>
// code to handle the string here
}
}
一般来说,这取决于您期望的类型(例如,自定义 Avro 记录),但在您处理字符串的情况下,将 rdd 视为 string[s] 的集合就足够了。
理论上,如果您想应用转换(例如,filter
,...),您甚至不需要使用单独的 .foreachRDD 迭代 RDD。例如,如果要过滤包含特定单词的所有字符串,可以使用:
val infoLines = directKafkaStream.filter { line =>
line.contains("INFO")
}
请注意,infoLines
仍将返回 DStream[String]
,因此您仍然会遇到相同的初始问题:如何访问单个字符串? 您需要了解 DStreams 和 RDD 是 Spark 和 Spark Streaming 使用的高级数据抽象 - 通常您将首先对那些具有转换的操作进行操作,然后应用操作(例如,saveAsTextFile
) - 很少有 println
语句。
但是,从你的问题来看,你似乎需要阅读一些关于Spark Streaming的文档:official documentation是一个很好的资源(尽管有些书籍,例如 Learning Spark,也可能有助于您理解)。
关于scala - 如何访问InputDStream数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34067920/