hadoop - Hive 为 HDFS 中的每个插入创建多个小文件

标签 hadoop hive hdfs apache-kafka spark-streaming

下面已经实现了

  1. Kafka Producer 使用 Spark Streaming 从 Twitter 中提取数据。
  2. Kafka Consumer 将数据提取到 Hive 外部表(在 HDFS 上)。

虽然到目前为止一切正常。 我只面临一个问题,当我的应用程序将数据插入 Hive 表时,它创建了一个小文件,每个文件的每一行数据。

下面是代码

// Define which topics to read from
  val topic = "topic_twitter"
  val groupId = "group-1"
  val consumer = KafkaConsumer(topic, groupId, "localhost:2181")

//Create SparkContext
  val sparkContext = new SparkContext("local[2]", "KafkaConsumer")

//Create HiveContext  
  val hiveContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)

  hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS twitter_data (tweetId BIGINT, tweetText STRING, userName STRING, tweetTimeStamp STRING,   userLang STRING)")
  hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS demo (foo STRING)")

Hive 演示表已填充了一条记录。 Kafka 消费者循环遍历主题 ="topic_twitter"的数据,处理每一行并填充到 Hive 表中

val hiveSql = "INSERT INTO TABLE twitter_data SELECT STACK( 1," + 
    tweetID        +","  + 
    tweetText      +"," + 
    userName       +"," +
    tweetTimeStamp +","  +
    userLang + ") FROM demo limit 1"

hiveContext.sql(hiveSql)

下面是我的 Hadoop 环境中的图像。 twitter_data,演示 Hie Tables in HDFS

在 HDFS 中创建的最后 10 个文件 enter image description here

如您所见,文件大小不超过 200KB,有没有办法将这些文件合并到一个文件中?

最佳答案

[take 2] 好的,所以您无法将数据正确地“流式传输”到 Hive 中。但是你可以添加一个周期性的compaction post-processing job...

  • 创建包含 3 个分区的表,例如(role='collectA'), (role='collectB'), (role='archive')
  • 将您的 Spark 插入指向 (role='activeA')
  • 在某个时候,切换到 (role='activeB')
  • 然后转储您在“A”分区中收集的每条记录 进入“存档”,希望 Hive 默认配置能够很好地限制碎片

    INSERT INTO TABLE twitter_data PARTITION (role='archive') 选择 ... 来自 twitter_data WHERE role='activeA' ; 截断表 twitter_data 分区(角色='activeA') ;

  • 在某些时候,切换回“A”等。

最后一句话:如果 Hive 在每个压缩作业上仍然创建了太多文件,那么尝试 tweaking some parameters在您的 session 中,就在 INSERT 之前,例如

set hive.merge.mapfiles =true;
set hive.merge.mapredfiles =true;
set hive.merge.smallfiles.avgsize=1024000000;

关于hadoop - Hive 为 HDFS 中的每个插入创建多个小文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32096347/

相关文章:

java - 在hadoop中进行mapreducing后,part-r-00000和成功文件为0 kb

eclipse - 映射减少分布式缓存

hadoop - 为什么 "Flatten"不是 PIG 中的 UDF?

java - apache spark 与 kafka 和 hive 集成的 Spark 流

hadoop - 在 hive 中按 id 收集数据

hadoop - 配置单元查询转储问题

hadoop - 在 Hadoop 中建模数据

apache - Hadoop 3.2.0在群集中不起作用(VirtualBox)

hadoop - 一些数据节点在清除 HDFS 后仍然显示使用的 block 池

hadoop - 每次重新启动集群时都无法访问 HDFS 中的文件?