hadoop - 如何使用 Flume 在源上执行预处理并在 hdfs 接收器中保留真实文件名

标签 hadoop hdfs flume flume-ng

我是 Apache Flume 的新手,我很难理解它的确切工作原理。为了说明我的问题,所以我说明了我的需求和我做了什么。

我想在 csv 文件目录(这些文件每 5 分钟构建一次)和 HDFS 集群之间配置一个流。

我确定“假脱机目录”源和 HDFS 接收器是我需要的。那就是给我这个 flume.conf 文件

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = hdfsSink

# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = spooldir
agent.sources.seqGenSrc.spoolDir = /home/user/data

# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://localhost/Flume/data
agent.sinks.hdfsSink.hdfs.fileType = DataStream

agent.sinks.hdfsSink.hdfs.writeFormat=Text    

#Specify the channel the sink should use
agent.sinks.hdfsSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

结果是输入文件在我的本地文件系统上被重命名为“.complete”,数据被上传到 HDFS 上,新名称我猜是唯一的,由 Flume 生成。

这几乎是我所需要的。

但在上传之前,我想做一些文件特定的操作(删除标题,转义逗号..)。不知道怎么弄,想着用拦截器。但是当数据在水槽中时,它会在事件中转换并流式传输。在他看来,对文件一无所知。

否则,原始时间事件写在文件名中,所以我希望这次时间与我的事件而不是当前日期相关联。

我还想在 hdfs 中保留原始文件名(里面有一些有用的信息)。

有人有什么建议可以帮助我吗?

最佳答案

如果您指定,原始文件名可以保留为标题

agent.sources.seqGenSrc.fileHeader=true 

然后可以在您的水槽中取回。

如果您想在文件中操作数据,请使用拦截器。您应该知道,事件基本上是假脱机目录中文件中的一行。

最后但同样重要的是,您需要使用 fileHeader 属性将事件通过管道返回到正确的文件。这可以通过在您的接收器中指定路径来实现,如下所示:

agent.sinks.hdfsSink.hdfs.path = hdfs://localhost/Flume/data/%{file}

您可以使用 Prefix 和 Suffix 进一步配置文件名:

hdfs.filePrefix FlumeData   Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix –   Suffix to append to file (eg .avro - NOTE: period is not automatically added)

关于hadoop - 如何使用 Flume 在源上执行预处理并在 hdfs 接收器中保留真实文件名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27621680/

相关文章:

java - Flume的HttpSource : is the Jetty server multithread?

logging - 以 "realtime"的速度重播日志文件?

python - 如何在AWS Elastic MapReduce上使用Python流创建 “side-effect”文件?

Hadoop任务进度

apache-spark - PySpark:如何在读取 Parquet 时读取分区列

hadoop - 边缘节点 hortonworks 使用

csv - 使用水槽将 csv 文件传输到 hdfs,并将它们转换为 avro

java - HBase表上普通Java程序和MapReduce java程序的区别

hadoop - 该目录已经锁定 hadoop

hadoop - 实时数据集成 Kafka、Hadoop、Avro、HDFS是如何组合在一起的,数据集成有哪些架构