apache-spark - 如何让 Spark Streaming 写入它的输出以便 Impala 可以读取它?

标签 apache-spark hadoop hive spark-streaming impala

我在使用 Spark Streaming API 时遇到以下问题。我目前正在通过 Flume 将输入数据流式传输到 Spark Streaming,我计划用它对数据进行一些预处理。然后,我想把数据保存到Hadoop的文件系统中,用Impala查询。但是,Spark 将数据文件写入单独的目录,并为每个 RDD 生成一个新目录。

这是一个问题,因为首先,Impala 中的外部表无法检测到子目录,只能检测到它们指向的目录内的文件,除非已分区。其次,Spark 添加新目录的速度如此之快,以至于在 Impala 中为每个生成的目录定期创建一个新分区对性能来说非常糟糕。另一方面,如果我选择增加 Spark 中写入的滚动间隔,这样目录的生成频率就会降低,那么在 Impala 可以读取传入数据之前会有额外的延迟。这是 Not Acceptable ,因为我的系统必须支持实时应用程序。在 Hive 中,我可以通过使用这些设置配置外部表来检测子目录而无需分区:

set hive.mapred.supports.subdirectories=true;
set mapred.input.dir.recursive=true;

但据我了解,Impala 没有这样的功能。

我目前正在使用以下代码从 Flume 读取数据并将其写入 HDFS:

val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8"))).saveAsTextFiles(path)

这里,变量路径决定了目录的前缀,文本文件(part-0000等)被添加到目录中,目录名的其余部分是Spark生成的时间戳。我可以将代码更改为如下所示:

val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
val mapStream = stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8")))
mapStream.foreachRDD(rdd => rdd.saveAsTextFile(path))

在这种情况下,文件将被添加到由路径确定的相同目录,但由于它们总是被命名为 part-00000、part-00001、part-00002 等,之前生成的文件将被覆盖。在检查 Spark 的源代码时,我注意到文件的名称是由 SparkHadoopWriter 的 open() 方法中的一行确定的:

val outputName = "part-"  + numfmt.format(splitID)

在我看来,没有办法通过 Spark API 来操作 splitID。总而言之,我的问题如下:

  • 有什么方法可以让Impala中的外部表检测到子目录吗?
  • 如果没有,是否有任何方法可以让 Spark 将其输出文件写入单个目录或以其他方式写入 Impala 可立即读取的格式?
  • 如果没有,Spark 是否需要任何类型的更新来解决这个问题,或者我应该只分支我自己的 Spark 版本,我可以用它来决定它自己编写的文件的名称?

最佳答案

我不能代表 Impala。

part-xxxxx 是 Spark 遵循的 hadoop 约定。大多数工具都理解这种格式,我猜想 Spark 对此无能为力。零件文件需要是唯一的,在文件名后附加分区号是一种常用技术。

我会在 Impala 中查看如何读取部分文件,因为大多数 hadoop 工具都是以这种方式生成的。

如果有人想自定义目录结构 - 尽管这不是您的问题 - 可以很容易地实现,比如更改 prefix-timestamp-suffix 格式。 Spark Steaming 在底层使用 Spark 的 RDD.saveAsTextFiles(..),它可以被定制。这是来自 DStream.scala 的代码:

  def saveAsTextFiles(prefix: String, suffix: String = "") {
    val saveFunc = (rdd: RDD[T], time: Time) => {
      val file = rddToFileName(prefix, suffix, time)
      rdd.saveAsTextFile(file)
    }
    this.foreachRDD(saveFunc)
  }

关于apache-spark - 如何让 Spark Streaming 写入它的输出以便 Impala 可以读取它?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24204656/

相关文章:

hadoop - 将 Spark 设置为 Hive 的默认执行引擎

java - 在 Linux Fedora Hyper-V 虚拟机上独立启动 HBase 时出错

hadoop - 使用 NIFI 从 Kafka 插入到 Cassandra

csv - 在 beeline hive 中导出为 csv

linux - 使用 hive -e '<hive command>' 执行配置单元加载命令

python - 在 hive 或 pyspark 中透视日志

scala - 创建 Spark 存在用户定义的函数,其工作方式类似于 Scala Array#exists 函数

sql - 将 Hive 查询推送到数据库级别

scala - Spark 数据框将列值获取到字符串变量中

hadoop - 节点管理器是否在每个 DataNode 内部执行 Map 和 Reduce 阶段?