scala - Spark 流 : Write Data to HDFS by reading from one HDFSdir to another

标签 scala apache-spark hadoop hdfs

我正在尝试使用 Spark Streaming 将数据从一个 HDFS 位置读取到另一个位置

下面是我在 spark-shell 上的代码片段

但我看不到在 HDFS 输出目录上创建的文件 能否指出如何在 HDFS 上加载文件

  scala> sc.stop()

  scala> import org.apache.spark.SparkConf

  scala> import org.apache.spark.streaming

  scala> import org.apache.spark.streaming.{StreamingContext,Seconds}

  scala> val conf = new SparkConf().setMaster("local[2]").setAppName("files_word_count")

  scala> val ssc = new StreamingContext(conf,Seconds(10))

  scala> val DF = ssc.textFileStream("/user/cloudera/streamingcontext_dir")
  scala> val words_freq = DF.flatMap(x=>(x.split(" "))).map(y=>(y,1)).reduceByKey(_+_)

  scala>    words_freq.saveAsTextFiles("hdfs://localhost:8020/user/cloudera/streamingcontext_dir2")

  scala> ssc.start()

我已将文件放在 HDFS“/user/cloudera/streamingcontext_dir”上并创建了另一个目录“/user/cloudera/streamingcontext_dir2”以查看写入的文件

但我在输出目录中看不到文件 有人可以指出这里出了什么问题吗?

谢谢 提交

最佳答案

尝试在这里使用 RDD 而不是整个 DStream:

words_freq.foreachRDD(rdd => 
rdd.saveAsTextFile("hdfs://localhost:8020/user/cloudera/streamingcontext_dir2")

关于scala - Spark 流 : Write Data to HDFS by reading from one HDFSdir to another,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53886293/

相关文章:

scala - 如何在 Scala 的主构造函数中定义局部 var/val?

java - 将结果集转换为数据框

linux - Bash:将 linux 中的文件拆分为 10 个仅由空行组成的文件

java - 组织.apache.hadoop.mapreduce.counters.LimitExceededException : Too many counters: 121 max=120

Scala XML 文字 - bool 值与字符串

scala - 如何使用带有自定义 UDF 的 DataFrame.explode 将字符串拆分为子字符串?

scala - 尝试用两列 [Seq(), String] 创建数据框 - Spark

apache-spark - Spark : Difference between numPartitions in read. jdbc(..numPartitions..) 和 repartition(..numPartitions..)

hadoop - Apache Drill - 不在 Hive DB 中列出表

apache - 使用 APACHE Web 服务器、Linux CentOS 访问 HDFS HADOOP