hadoop - 将kafka的Spark批量流式传输到单个文件中

标签 hadoop apache-spark apache-kafka spark-streaming

我正在使用批处理流 (maxRatePerPartition 10.000) 从 Kafka 流式传输数据。因此,在每批处理中,我处理 10.000 条 kafka 消息。

在这个批处理运行中,我通过从 rdd 中创建一个数据帧来处理每条消息。处理后,我使用以下方法将每个处理过的记录保存到同一个文件:dataFrame.write.mode(SaveMode.append)。 因此它将所有消息附加到同一个文件。

只要它在一个批处理运行中运行就可以。但是在执行下一个批处理运行(处理下 10.000 条消息)后,它会为下一个 10.000 条消息创建一个新文件。

现在的问题是:每个文件( block )保留文件系统的 50mb,但只包含大约 1mb(10.000 条消息)。 与其在每次批处理运行时都创建新文件,我宁愿将其全部附加到一个文件中,只要它不超过 50mb。

您知道如何执行此操作或为什么它在我的示例中不起作用吗?您可以在这里查看我的编码:

import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.immutable.Set


object SparkStreaming extends Constants {


  def main(args: Array[String]) {

//create a new Spark configuration...
val conf = new SparkConf()
  .setMaster("local[2]") // ...using 2 cores
  .setAppName("Streaming")
  .set("spark.streaming.kafka.maxRatePerPartition", "10000")  //... processing max. 10000 messages per second

//create a streaming context for micro batch
val ssc = new StreamingContext(conf, Seconds(1)) //Note: processing max. 1*10000 messages (see config above.)

//Setup up Kafka DStream
val kafkaParams = Map("metadata.broker.list" -> "sandbox.hortonworks.com:6667",
  "auto.offset.reset" -> "smallest") //Start from the beginning
val kafkaTopics = Set(KAFKA_TOPIC_PARQUET)

val directKafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,
  kafkaParams, kafkaTopics)

val records = directKafkaStream.map(Source => StreamingFunctions.transformAvroSource(Source))


records.foreachRDD((rdd: RDD[TimeseriesRddRecord], time: Time) => {
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) // Worker node singleton
  import sqlContext.implicits._

  val dataFrame = rdd.toDF()

  dataFrame.write.mode(SaveMode.Append).partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*).parquet(PARQUET_FILE_PATH_TIMESERIES_LOCAL)
  println(s"Written entries: ${dataFrame.count()}")
}
)


//start streaming until the process is killed
ssc.start()
ssc.awaitTermination()

  }


  /** Case class for converting RDD to DataFrame */
  case class DataFrameRecord(thingId: String, timestamp: Long, propertyName: String, propertyValue: Double)


  /** Lazily instantiated singleton instance of SQLContext */
  object SQLContextSingleton {

@transient private var instance: SQLContext = _

def getInstance(sparkContext: SparkContext): SQLContext = {
  if (instance == null) {
    instance = new SQLContext(sparkContext)
  }
  instance
    }
  }

}

我很乐意听取您的意见。 谢谢,亚历克斯

最佳答案

这可以通过使用 coalesce 函数然后覆盖现有文件来完成。

但正如线程中所讨论的那样 Spark coalesce looses file when program is aborted程序中断时会报错。

所以目前看来实现这样的逻辑还不够。

关于hadoop - 将kafka的Spark批量流式传输到单个文件中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33346660/

相关文章:

java - 当 hdfs 目录中创建文件时如何收到通知

scala - 如何使用toDF创建带有空值的DataFrame?

apache-spark - 有没有办法在不指定下限和上限的情况下从数据库(Oracle)读取数据(spark.read.jdbc)时指定分区数?

linux - kafka logs + 如何限制日志大小

java - 似乎无法将 KStream<A,B> 转换为 KTable<X,Y>

hadoop - 如何找到生成的MapTasks数量?

python - 对 python 输入文件参数和标准输入流使用两个管道

hadoop - 百分位数函数在 Hive 中如何工作?

java - 具有多个上下文的 Spark 作业失败

python - 在 python 中使用来自不同容器的 Kafka 消息