apache - 格式化 Apache Flume HDFS 序列化程序

标签 apache hadoop flume flume-ng

我刚刚开始使用 flume,需要将一些 header 插入到 hdfs 接收器中。

虽然格式错误,但我可以正常工作,而且我无法控制列。

使用这个配置:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = syslogudp
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname

a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sources.r1.interceptors.i2.preserveExisting = false

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/user/vagrant/syslog/%y-%m-%d/
a1.sinks.k1.hdfs.rollInterval = 120
a1.sinks.k1.hdfs.rollCount = 100
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

a1.sinks.k1.serializer = header_and_text
a1.sinks.k1.serializer.columns = timestamp hostname
a1.sinks.k1.serializer.format = CSV
a1.sinks.k1.serializer.appendNewline = true

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

写入HDFS的日志,除了序列化方面,主要是没问题的:

{timestamp=1415574695138, Severity=6, host=PolkaSpots, Facility=3, hostname=127.0.1.1} hostapd: wlan0-1: STA xx WPA: group key handshake completed (RSN)

如何格式化日志,使它们看起来像这样:

1415574695138 127.0.1.1 hostapd: wlan0-1: STA xx WPA: group key handshake completed (RSN)

首先是时间戳,然后是主机名,然后是系统日志消息正文。

最佳答案

原因是您配置的两个拦截器正在将值写入 Flume 事件 header ,这些 header 由 HeaderAndBodyTextEventSerializer 序列化到正文。后者只是这样做:

public void write(Event e) throws IOException {
    out.write((e.getHeaders() + " ").getBytes());
    out.write(e.getBody());
    if (appendNewline) {
      out.write('\n');
    }
  }

委托(delegate)给 e.getHeaders() 只会将 map 序列化为 JSON 字符串。

要解决此问题,我建议创建您自己的序列化程序并重载 write() 方法以将输出格式化为制表符分隔值。 在这种情况下,您只需要在以下位置指定您的类的路径:

a1.sinks.k1.serializer = com.mycompany.MySerlizer

然后将 jar 放到 Flume 的类路径中。

关于apache - 格式化 Apache Flume HDFS 序列化程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26834571/

相关文章:

java - 在 Apache Flume 1.5 中的何处放置自定义 jar?

flume - Flume-ng null 事件的自定义接收器

PHP 无法在 Windows 上的 Apache 2 上运行

java - Hadoop ReflectionUtils 可以初始化枚举吗?

javascript - 从 http ://localhost:8080 请求时不允许 Apache 服务器方法

apache-spark - Snappydata 和外部 Hive 兼容性

hadoop - 不带 reducer 的Mapper作业中的文件数

flume - 在被拦截器拦截之前复制 channel

python - SCRIPT_NAME 与 apache cgid 模块中的 web.py 不匹配 REQUEST_URI

apache - mod_rewrite 和图像重定向