hadoop - Flume ElasticSearchSink不会消耗所有消息

标签 hadoop elasticsearch hdfs flume flume-ng

我正在使用水槽处理到hdfs的日志行,并使用ElasticSearchSink将它们记录到ElasticSearch中。

这是我的配置:

agent.channels.memory-channel.type = memory

agent.sources.tail-source.type = exec
agent.sources.tail-source.command = tail -4000 /home/cto/hs_err_pid11679.log
agent.sources.tail-source.channels = memory-channel

agent.sinks.log-sink.channel = memory-channel
agent.sinks.log-sink.type = logger

#####INTERCEPTORS

agent.sources.tail-source.interceptors = timestampInterceptor
agent.sources.tail-source.interceptors.timestampInterceptor.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

####SINK
# Setting the sink to HDFS
agent.sinks.hdfs-sink.channel = memory-channel
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = hdfs://localhost:8020/data/flume/%y-%m-%d/
agent.sinks.hdfs-sink.hdfs.fileType = DataStream
agent.sinks.hdfs-sink.hdfs.inUsePrefix =.
agent.sinks.hdfs-sink.hdfs.rollCount = 0
agent.sinks.hdfs-sink.hdfs.rollInterval = 0
agent.sinks.hdfs-sink.hdfs.rollSize = 10000000
agent.sinks.hdfs-sink.hdfs.idleTimeout = 10
agent.sinks.hdfs-sink.hdfs.writeFormat = Text

agent.sinks.elastic-sink.channel = memory-channel
agent.sinks.elastic-sink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent.sinks.elastic-sink.hostNames = 127.0.0.1:9300
agent.sinks.elastic-sink.indexName = flume_index
agent.sinks.elastic-sink.indexType = logs_type
agent.sinks.elastic-sink.clusterName = elasticsearch
agent.sinks.elastic-sink.batchSize = 500
agent.sinks.elastic-sink.ttl = 5d
agent.sinks.elastic-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer


# Finally, activate.
agent.channels = memory-channel
agent.sources = tail-source
agent.sinks = log-sink hdfs-sink elastic-sink

问题是,我仅使用kibana看到1-2条 flex 消息,而在hdfs文件中却看到很多消息。

知道我在这里缺少什么吗?

最佳答案

该问题与序列化器中的错误有关。
如果我们删除这行:

agent.sinks.elastic-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer

消息已被使用,没有问题。
问题在于使用序列化程序时创建@timestamp字段的方式。

关于hadoop - Flume ElasticSearchSink不会消耗所有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32439051/

相关文章:

scala - 如何在scala中修复foreach()缺少的参数类型

hadoop - 向现有的HDFS集群添加新的Namenode

apache - Apache Hive与法线贴图减少

hadoop - hbase 命名空间/表数据存储在 hdfs 上的什么位置?

elasticsearch - 如何在ElasticSearch中保护数据

python - 将自定义字段添加到 logstash

hadoop - 使用 OOZIE 在文件到达 NFS 时触发执行的最佳方法

hadoop - Hadoop将不完整的文件写入HDFS

hadoop - 以下示例在 Hadoop 中创建了多少个 block ?

java - Elasticsearch 查询在同一查询中使用 And & Or - Bool 查询?