java - 如何使用spark Streaming从HDFS读取数据?

标签 java apache-spark hdfs spark-streaming

JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));

我的HDFS目录包含json文件

最佳答案

您可以使用textFileStream将其作为文本文件读取并稍后进行转换。

val dstream = ssc.textFileStream("path to hdfs directory")

这为您提供了DStream[Strings],它是RDD[String]

的集合

然后你可以得到每个时间间隔的RDD

dstream.foreachRDD(rdd => {
  //now apply a transformation or anything with the each rdd
 spark.read.json(rdd) // to change it to dataframe
})

scc.start()             // Start the computation
ssc.awaitTermination()   // Wait for the computation to terminate

希望这有帮助

关于java - 如何使用spark Streaming从HDFS读取数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49228882/

相关文章:

python - 如何找到当前 spark 上下文中加载的所有文本文件?

Hadoop - 数据在复制到 HDFS 时自动平衡?

python - 在 Oozie 中运行 python 脚本时如何导入本地 python 模块?

java - 当之前的一些行在我自己的 Eclipse 编辑器中折叠时获取当前行中的文本

java - eclipse生成war文件时压缩哪些文件

java - 使用以 BlockingQueue<Runnable> 作为参数的构造函数创建的 ThreadPoolExecutor 如何将 Callables 排入队列?

apache-spark - Pyspark - 根据时间戳值加入时间戳窗口

java - 计算图中所有顶点的数量

apache-spark - 如何在 PySpark 中使用交叉验证提取平均指标

apache-spark - 如何使用结构化流检查点管理 HDFS 内存