下面已经实现了
- Kafka Producer 使用 Spark Streaming 从 Twitter 中提取数据。
- Kafka Consumer 将数据提取到 Hive 外部表(在 HDFS 上)。
虽然到目前为止一切正常。 我只面临一个问题,当我的应用程序将数据插入 Hive 表时,它创建了一个小文件,每个文件的每一行数据。
下面是代码
// Define which topics to read from
val topic = "topic_twitter"
val groupId = "group-1"
val consumer = KafkaConsumer(topic, groupId, "localhost:2181")
//Create SparkContext
val sparkContext = new SparkContext("local[2]", "KafkaConsumer")
//Create HiveContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)
hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS twitter_data (tweetId BIGINT, tweetText STRING, userName STRING, tweetTimeStamp STRING, userLang STRING)")
hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS demo (foo STRING)")
Hive 演示表已填充了一条记录。 Kafka 消费者循环遍历主题 ="topic_twitter"的数据,处理每一行并填充到 Hive 表中
val hiveSql = "INSERT INTO TABLE twitter_data SELECT STACK( 1," +
tweetID +"," +
tweetText +"," +
userName +"," +
tweetTimeStamp +"," +
userLang + ") FROM demo limit 1"
hiveContext.sql(hiveSql)
下面是我的 Hadoop 环境中的图像。 twitter_data,演示
如您所见,文件大小不超过 200KB,有没有办法将这些文件合并到一个文件中?
最佳答案
[take 2] 好的,所以您无法将数据正确地“流式传输”到 Hive 中。但是你可以添加一个周期性的compaction post-processing job...
- 创建包含 3 个分区的表,例如
(role='collectA')
,(role='collectB')
,(role='archive')
- 将您的 Spark 插入指向
(role='activeA')
- 在某个时候,切换到
(role='activeB')
然后转储您在“A”分区中收集的每条记录 进入“存档”,希望 Hive 默认配置能够很好地限制碎片
INSERT INTO TABLE twitter_data PARTITION (role='archive') 选择 ... 来自 twitter_data WHERE role='activeA' ; 截断表 twitter_data 分区(角色='activeA') ;
在某些时候,切换回“A”等。
最后一句话:如果 Hive 在每个压缩作业上仍然创建了太多文件,那么尝试 tweaking some parameters在您的 session 中,就在 INSERT 之前,例如
set hive.merge.mapfiles =true;
set hive.merge.mapredfiles =true;
set hive.merge.smallfiles.avgsize=1024000000;
关于hadoop - Hive 为 HDFS 中的每个插入创建多个小文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32096347/