java - Hadoop Mapreduce中的XML解析

标签 java xml hadoop xml-parsing mapreduce

我编写了一个mapreduce代码,用于将XML解析为CSV。
但是运行作业后,我的输出目录中没有任何输出。
我不确定该文件是未读取还是未写入。我是Hadoop mapreduce的新手。

你能帮忙吗?

这是我的整个代码。

public class XmlParser11
{
        public static String outvalue;
        public static class XmlInputFormat1 extends TextInputFormat {
        public static final String START_TAG_KEY = "xmlinput.start";
        public static final String END_TAG_KEY = "xmlinput.end";

        public RecordReader<LongWritable, Text> createRecordReader(
                InputSplit split, TaskAttemptContext context) {
            return new XmlRecordReader();
        }
        public static class XmlRecordReader extends
                RecordReader<LongWritable, Text> {
            private byte[] startTag;
            private byte[] endTag;
            private long start;
            private long end;
            private FSDataInputStream fsin;
            private DataOutputBuffer buffer = new DataOutputBuffer();

            private LongWritable key = new LongWritable();
            private Text value = new Text();
                @Override
            public void initialize(InputSplit split, TaskAttemptContext context)
                    throws IOException, InterruptedException {
                    System.out.println("B");
                Configuration conf = context.getConfiguration();
                startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
                endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
                FileSplit fileSplit = (FileSplit) split;

                // open the file and seek to the start of the split
                start = fileSplit.getStart();
                end = start + fileSplit.getLength();
                Path file = fileSplit.getPath();
                FileSystem fs = file.getFileSystem(conf);
                fsin = fs.open(fileSplit.getPath());
                fsin.seek(start);

            }
        @Override
            public boolean nextKeyValue() throws IOException,
                    InterruptedException {
            System.out.println("C");
                if (fsin.getPos() < end) {
                    if (readUntilMatch(startTag, false)) {
                        try {
                            buffer.write(startTag);
                            if (readUntilMatch(endTag, true)) {
                                key.set(fsin.getPos());
                                value.set(buffer.getData(), 0,
                                        buffer.getLength());
                                return true;
                            }
                        } finally {
                            buffer.reset();
                        }
                    }
                }
                return false;
            }
        @Override
           public LongWritable getCurrentKey() throws IOException,
                    InterruptedException {
                return key;
            }

        @Override
            public Text getCurrentValue() throws IOException,
                    InterruptedException {

                return value;
            }
        @Override
            public void close() throws IOException {
                fsin.close();
            }
        @Override
            public float getProgress() throws IOException {

                return (fsin.getPos() - start) / (float) (end - start);
            }

            private boolean readUntilMatch(byte[] match, boolean withinBlock)
                    throws IOException {
                int i = 0;

                while (true) {
                    int b = fsin.read();
                    // end of file:
                    if (b == -1)
                        return false;
                    // save to buffer:
                    if (withinBlock)
                        buffer.write(b);
                    // check if we're matching:
                    if (b == match[i]) {
                        i++;
                        if (i >= match.length)
                            return true;
                    } else
                        i = 0;
                    // see if we've passed the stop point:
                    if (!withinBlock && i == 0 && fsin.getPos() >= end)
                        return false;
                }
            }
        }
    }


        public static class Map extends Mapper<Text, Text,
        Text, Text> {
            @SuppressWarnings("unchecked")
            @Override
            protected void map(Text key, Text value,
                     @SuppressWarnings("rawtypes") Mapper.Context context)
                             throws
                             IOException, InterruptedException {

                String document = value.toString();
                System.out.println("‘" + document + "‘");

                XMLInputFactory xmlif = XMLInputFactory.newInstance();
                XMLStreamReader xmlr;

            try {
                xmlr = xmlif.createXMLStreamReader(new FileReader(document));
                while(xmlr.hasNext())
                {
                   printEvent(xmlr);
                   xmlr.next();
                 }
                   xmlr.close();
                   context.write(null,new Text (outvalue));
            } catch (XMLStreamException e) {

                e.printStackTrace();
            }
            }
                   private void printEvent(XMLStreamReader xmlr) {

                       switch (xmlr.getEventType()) {

                       case XMLStreamConstants.START_ELEMENT:
                          print(xmlr);
                           break;

                       case XMLStreamConstants.CHARACTERS:
                           int start = xmlr.getTextStart();
                           int length = xmlr.getTextLength();
                           System.out.print(new String(xmlr.getTextCharacters(),
                                      start,
                                      length));
                           break;
                       }
                   }
                   private  String print(XMLStreamReader xmlr){
                        if(xmlr.hasName()){
                          for (int i=0; i < xmlr.getAttributeCount(); i++) {
                              String localName = xmlr.getLocalName();
                              if (localName != null);
                              String attName = xmlr.getAttributeLocalName(i);
                                String value = xmlr.getAttributeValue(i);
                                System.out.print(",");
                                String outvalue = localName +":"+ attName +"-"+value;
                                System.out.print(outvalue);
                          }
                        } return outvalue;
                      }

  }
        public static void main(String[] args) throws Exception
        {
                Configuration conf = new Configuration();

                conf.set("xmlinput.start", "<FICHER>");
                conf.set("xmlinput.end", "</FICHER>");
                Job job = new Job(conf);
                job.setJarByClass(XmlParser11.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);

                job.setMapperClass(XmlParser11.Map.class);
                job.setNumReduceTasks(0);

                job.setInputFormatClass(XmlInputFormat1.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                job.waitForCompletion(true);
        }

这是腻子

文件系统计数器
FILE:读取的字节数= 0强文本>
FILE:写入的字节数= 120678
FILE:读取操作数= 0
FILE:大读取操作数= 0
FILE:写操作数= 0
HDFS:读取的字节数= 1762671
HDFS:写入的字节数= 0
HDFS:读取操作数= 5
HDFS:大型读取操作数= 0
HDFS:写入操作数= 2
工作柜台
启动的 map task = 1
机架本地 map task = 1
所有 map 在占用的插槽中花费的总时间(ms)= 15960
所有设备花费的总时间减少,占用的时隙(ms)= 0
所有 map task 花费的总时间(毫秒)= 3990
所有 map task 花费的总vcore秒数= 3990
所有 map task 花费的总兆字节秒数= 16343040
Map-Reduce框架
map 输入记录= 0
map 输出记录= 0
输入分割字节= 124
溢出记录= 0
随机洗牌失败= 0
合并 map 输出= 0
经过的GC时间(ms)= 0
花的CPU时间(毫秒)= 1390
物理内存(字节)快照= 513449984
虚拟内存(字节)快照= 4122763264
已提交的总堆使用量(字节)= 2058354688
文件输入格式计数器
读取的字节数= 1762547
文件输出格式计数器
写入的字节数= 0

最佳答案

我认为问题在于开始标记。

 conf.set("xmlinput.start", "<FICHER");`
 conf.set("xmlinput.end", "</FICHER>");

希望对您有帮助。

关于java - Hadoop Mapreduce中的XML解析,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32608540/

相关文章:

java - JAXB 和 XSLT 处理器

sql - 创建 XML 文件时在 SQL 中合并

scala - 使用 sc.textFile() 加载本地文件以激发

java - 未知的数据绑定(bind)类

java - android loadIcon 产生 outOfMemoryError

java - 带有 http-header "Accept"的 HttpURLConnection GET 请求

hadoop - 谷歌云存储

java - 在 BorderLayout 中换出中心 JPanel

java - 如何在解码时捕获多次出现的 xml 到 pojo?

sql - hive 简单的正则表达式