hadoop - Apache Flume自定义拦截器-二进制和奇怪的HDFS文件

标签 hadoop hadoop-streaming flume flume-ng

我对水槽拦截器概念还比较陌生,面临一个问题:在应用拦截器之前,下沉的文件是普通的文本文件,而在应用拦截器之后,一切都变得非常糟糕。

我的拦截器代码如下-

package com.flume;

import org.apache.flume.*;
import org.apache.flume.interceptor.*;

import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;

public class CustomHostInterceptor implements Interceptor {

    private String hostValue;
    private String hostHeader;

    public CustomHostInterceptor(String hostHeader){
        this.hostHeader = hostHeader;
    }

    @Override
    public void initialize() {
        // At interceptor start up
        try {
            hostValue =
                    InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            throw new FlumeException("Cannot get Hostname", e);
        }
    }

    @Override
    public Event intercept(Event event) {

        // This is the event's body
        String body = new String(event.getBody());
        if(body.toLowerCase().contains("text")){
            try {
                event.setBody("hadoop".getBytes("UTF-8"));
            } catch (UnsupportedEncodingException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        // These are the event's headers
        Map<String, String> headers = event.getHeaders();

        // Enrich header with hostname
        headers.put(hostHeader, hostValue);

        // Let the enriched event go
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        List<Event> interceptedEvents =
                new ArrayList<Event>(events.size());
        for (Event event : events) {
            // Intercept any event
            Event interceptedEvent = intercept(event);
            interceptedEvents.add(interceptedEvent);
        }

        return interceptedEvents;
    }

    @Override
    public void close() {
        // At interceptor shutdown
    }

    public static class Builder
            implements Interceptor.Builder {

        private String hostHeader;

        @Override
        public void configure(Context context) {
            // Retrieve property from flume conf
            hostHeader = context.getString("hostHeader");
        }

        @Override
        public Interceptor build() {
            return new CustomHostInterceptor(hostHeader);
        }
    }
}

Flume conf是-
agent.sources=exec-source
agent.sinks=hdfs-sink
agent.channels=ch1

agent.sources.exec-source.type=exec
agent.sources.exec-source.command=tail -F /home/cloudera/Desktop/app.log
agent.sources.exec-source.interceptors = i1
agent.sources.exec-source.interceptors.i1.type = com.flume.CustomHostInterceptor$Builder
agent.sources.exec-source.interceptors.i1.hostHeader = hostname

agent.sinks.hdfs-sink.type=hdfs
agent.sinks.hdfs-sink.hdfs.path= hdfs://localhost:8020/bosch/flume/applogs
agent.sinks.hdfs-sink.hdfs.filePrefix=logs
agent.sinks.hdfs-sink.hdfs.rollInterval=60
agent.sinks.hdfs-sink.hdfs.rollSize=0

agent.channels.ch1.type=memory
agent.channels.ch1.capacity=1000

agent.sources.exec-source.channels=ch1
agent.sinks.hdfs-sink.channel=ch1

对在HDFS中创建的文件进行处理的过程-
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable���*q�CJv�/ESmP�ź
                                                                                           some textP�żc
                                                                                                           some more textP���K
                                                                                                                             textP��ߌangels and deamonsP��%�
          text bla blaP��1�angels and deamonsP��1�
                                                     testP��1�hmmmP��1�anything

有什么建议么?

谢谢

最佳答案

拦截器看起来没有问题。

在您的Flume Agent配置中。

您未指定此属性(hdfs.fileType),因此将其作为默认SequenceFile

尝试将此行添加到您的HDFS SINK中,让我知道是否可行。

agent.sinks.hdfs-sink.hdfs.fileType=DataStream 

关于hadoop - Apache Flume自定义拦截器-二进制和奇怪的HDFS文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33421863/

相关文章:

hadoop - Datanodes 处于事件状态,但我无法将文件复制到 HDFS [Hadoop 2.6.0 - Raspberry Pi Cluster]

hadoop - thrift 在我的 HBase master 上运行吗?如何与Happybase连接?

hadoop - 基于 Hadoop 的流媒体框架,支持 ORC、parquet 文件格式

python - 容器正在运行超出物理内存的较大文件

apache - Flume:没有引发任何错误,但是Flume无法完全传输文件

hadoop - 我是否需要将 Spark 与 YARN 结合使用才能通过 HDFS 实现 NODE LOCAL 数据局部性?

ubuntu - 设置数据节点时 Hadoop 中的特权操作异常?

hadoop - 错误org.apache.hadoop.hdfs.server.datanode.DataNode:java.io.IOException:本地异常调用本地/127.0.0.1:54310失败

hadoop - 将标题添加到HTTP Post内容Flume

performance - Elasticsearch 索引性能调优