java - 如何使用storm将数据持久化到HDFS

标签 java hadoop hdfs apache-storm

我有一个简单的 bolt,它从 kafka spout 读取数据,然后将数据写入 HDFS 目录。问题是在集群停止之前,bolt 不会写入。我如何确保 bolt 从 kafka spout 读取一个元组,然后立即将其写入 HDFS,或者至少写入每个“n”条目。 (我用的是CDH 4.4,Hadoop 2.0)

bolt 的java:

public class PrinterBolt10 extends BaseRichBolt{  
    private OutputCollector collector;
    private String values;
    Configuration configuration = null;
    FileSystem hdfs = null;
    FSDataOutputStream outputStream=null;
    BufferedWriter br = null; 
    List<String> valList;
    String machineValue;
    int upTime;
    int downTime;
    int idleTime; 

    public void prepare(Map config, TopologyContext context,OutputCollector collector) {
        upTime=0;
        downTime=0;
        idleTime=0;
        this.collector = collector;
        String timeStamp = new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime());
        try{
            configuration = new Configuration();
            configuration.set("fs.defaultFS", "hdfs://localhost.localdomain:8020");
            hdfs =FileSystem.get(configuration);
            outputStream = hdfs.create(new Path("/tmp/storm/StormHdfs/machine10_"+timeStamp+".txt"));
            br = new BufferedWriter( new OutputStreamWriter( outputStream , "UTF-8" ) );
            br.flush(); 
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void execute(Tuple tuple) {  
        values = tuple.toString();
        int start = values.indexOf('[');
        int end = values.indexOf(']'); 
        machineValue=values.substring(start+1,end); 
        String machine=machineValue.substring(0,machineValue.indexOf(','));
        String code = machineValue.substring(machineValue.indexOf(',')+1);
        int codeInt = Integer.parseInt(code);
        if(codeInt==0) idleTime+=30;
        elseif(codeInt==1) upTime+=30;
        else downTime+=30; 
        String finalMessage = machine + " "+ "upTime(s) :" + upTime+" "+ "idleTime(s): "+idleTime+" "+"downTime: "+downTime;  
        try {
            br.write(finalMessage);  // *This is the writing part into HDFS*
            br.write('\n'); 
            br.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // this bolt does not emit anything
    }

    public void cleanup() {}
}

最佳答案

编辑:完全改变了我的答案。

您需要使用HdfsBolt,而不是依靠自己编写文件。使用 HdfsBolt 消除了计算何时刷新到文件、打开缓冲流等的所有复杂问题。参见 http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1.3/bk_user-guide/content/ch_storm-using-hdfs-connector.html ,但您感兴趣的部分是:

// Use pipe as record boundary
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");

//Synchronize data buffer with the filesystem every 1000 tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// Rotate data files when they reach five MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

// Use default, Storm-generated file names
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/foo");

// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
     .withFsURL("hdfs://localhost:54310")
     .withFileNameFormat(fileNameFormat)
     .withRecordFormat(format)
     .withRotationPolicy(rotationPolicy)
     .withSyncPolicy(syncPolicy);

然后只需将当前 bolt 中的数据传递到这个 bolt 中即可。

关于java - 如何使用storm将数据持久化到HDFS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28559795/

相关文章:

sql - 根据 Hive 中 2 个源表的一些规则更新目标中的 "flag"

java - $HADOOP_COMMON_HOME 和 $HADOOP_HDFS_HOME 是同一个值吗?

java - 当对象不为 null 时构造函数抛出 NPE [JAVA]

java - 服务器发送事件 + Java 与 Spring MVC

java - 完成链表排序后,我不确定如何跳出循环

java - 传入数据字节数

hadoop - 只能写入 1 minReplication 节点中的 0 个。有 0 个数据节点正在运行,并且在此操作中排除了 0 个节点

hadoop - hive :动态分区

hadoop - 释放hadoop中的 "Non-DFS used"空间

hadoop - 如何将hdfs block 大小设置为1 GB?