JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
我的HDFS目录包含json文件
最佳答案
您可以使用textFileStream
将其作为文本文件读取并稍后进行转换。
val dstream = ssc.textFileStream("path to hdfs directory")
这为您提供了DStream[Strings]
,它是RDD[String]
然后你可以得到每个时间间隔的RDD
dstream.foreachRDD(rdd => {
//now apply a transformation or anything with the each rdd
spark.read.json(rdd) // to change it to dataframe
})
scc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
希望这有帮助
关于java - 如何使用spark Streaming从HDFS读取数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49228882/