hadoop - 如何使用Flume将数据实时写入HDFS?

标签 hadoop hive hdfs bigdata flume

我正在使用 Flume 将传感器数据存储在 HDFS 中。一旦通过 MQTT 接收到数据。订阅者将 JSON 格式的数据发布到 Flume HTTP 监听器。它目前工作正常,但问题是水槽在我停止它之前不会写入 HDFS 文件(或者文件大小达到 128MB)。我正在使用 Hive 在读取时应用模式。不幸的是,生成的配置单元表仅包含 1 个条目。这是正常的,因为 Flume 没有将新的数据写入文件(由 Hive 加载)。

有什么方法可以强制 Flume 以近乎实时的方式将即将到来的新数据写入 HDFS?所以,我不需要重新启动它或使用小文件?

here is my flume configuration:

# Name the components on this agent
emsFlumeAgent.sources = http_emsFlumeAgent
emsFlumeAgent.sinks = hdfs_sink
emsFlumeAgent.channels = channel_hdfs

# Describe/configure the source
emsFlumeAgent.sources.http_emsFlumeAgent.type = http
emsFlumeAgent.sources.http_emsFlumeAgent.bind = localhost
emsFlumeAgent.sources.http_emsFlumeAgent.port = 41414

# Describe the sink
emsFlumeAgent.sinks.hdfs_sink.type = hdfs
emsFlumeAgent.sinks.hdfs_sink.hdfs.path = hdfs://localhost:9000/EMS/%{sensor}
emsFlumeAgent.sinks.hdfs_sink.hdfs.rollInterval = 0
emsFlumeAgent.sinks.hdfs_sink.hdfs.rollSize = 134217728
emsFlumeAgent.sinks.hdfs_sink.hdfs.rollCount=0

#emsFlumeAgent.sinks.hdfs_sink.hdfs.idleTimeout=20
# Use a channel which buffers events in memory
emsFlumeAgent.channels.channel_hdfs.type = memory
emsFlumeAgent.channels.channel_hdfs.capacity = 10000
emsFlumeAgent.channels.channel_hdfs.transactionCapacity = 100

# Bind the source and sinks to the channel
emsFlumeAgent.sources.http_emsFlumeAgent.channels = channel_hdfs 
emsFlumeAgent.sinks.hdfs_sink.channel = channel_hdfs

最佳答案

我认为这里的棘手之处在于您希望近乎实时地将数据写入 HDFS 但又不想要小文件(出于明显的原因),这可能是一件很难实现的事情。

您需要在以下两个参数之间找到最佳平衡点:

hdfs.rollSize(默认 = 1024) - 触发滚动的文件大小,以字节为单位(0:从不根据文件大小滚动)

hdfs.batchSize(默认 = 100) - 在刷新到 HDFS 之前写入文件的事件数

如果您的数据不太可能在首选持续时间内达到 128 MB,那么您可能需要减小 rollSize,但仅限于不会遇到 small files problem 的程度。 .

因为您没有在 HDFS sink 中设置任何批处理大小,您应该会在每 100 条记录后看到 HDFS 刷新的结果,但是一旦刷新记录的大小共同达到 128 MB,内容将汇总到一个128 MB 文件。 这也没有发生吗?你能确认一下吗?

希望这对您有所帮助!

关于hadoop - 如何使用Flume将数据实时写入HDFS?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52839209/

相关文章:

hadoop - Hive与Hadoop的连接

hadoop - pig 拉丁语-从单行输入创建多行输出

hive - 如何在 Hive 中分组集后 reshape 数据?

hadoop - 确定配置单元表中的存储桶数?

java - HBase性能调优因素

java - 指定了无效的Hadoop运行时-Eclipse

带有 Hive 连接的 Spring-boot-application 不会启动嵌入式 Tomcat

git - 反之亦然如何将HDFS中的文件获取/复制到Git存储库中?

hadoop - 大数据 RDBMS

hadoop - 如何使用 JAVA API 在 HDFS 中移动或复制文件