hadoop - Pig 中的 SimpleTextLoader UDF

标签 hadoop mapreduce apache-pig

我想为 Pig UDF 创建一个自定义加载函数,我已经使用链接创建了一个 SimpleTextLoader

https://pig.apache.org/docs/r0.11.0/udf.html ,我已经成功地为此代码生成了 jar 文件,在 pig 中注册并运行 Pig 脚本。我得到的是空输出。我不知道如何解决这个问题,我们将不胜感激。

下面是我的Java代码

public class SimpleTextLoader extends LoadFunc{
    protected RecordReader in = null;
    private byte fieldDel = '\t';
    private ArrayList<Object> mProtoTuple = null;
    private TupleFactory mTupleFactory = TupleFactory.getInstance();
    private static final int BUFFER_SIZE = 1024;
      public SimpleTextLoader() {
    }

public SimpleTextLoader(String delimiter) 
{
        this();
        if (delimiter.length() == 1) {
            this.fieldDel = (byte)delimiter.charAt(0);
        } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') {
            switch (delimiter.charAt(1)) {
            case 't':
                this.fieldDel = (byte)'\t';
                break;

            case 'x':
               fieldDel =
                    Integer.valueOf(delimiter.substring(2), 16).byteValue();
               break;

            case 'u':
                this.fieldDel =
                    Integer.valueOf(delimiter.substring(2)).byteValue();
                break;

            default:
                throw new RuntimeException("Unknown delimiter " + delimiter);
            }
        } else {
            throw new RuntimeException("PigStorage delimeter must be a single character");
        }
    }

private void readField(byte[] buf, int start, int end) {
        if (mProtoTuple == null) {
            mProtoTuple = new ArrayList<Object>();
        }

        if (start == end) {
            // NULL value
            mProtoTuple.add(null);
        } else {
            mProtoTuple.add(new DataByteArray(buf, start, end));
        }
    }    @Override
    public Tuple getNext() throws IOException {
         try {
            boolean notDone = in.nextKeyValue();
            if (notDone) {
                return null;
            }
            Text value = (Text) in.getCurrentValue();
            System.out.println("printing value"  +value);
            byte[] buf = value.getBytes();
            int len = value.getLength();
            int start = 0;

            for (int i = 0; i < len; i++) {
                if (buf[i] == fieldDel) {
                    readField(buf, start, i);
                    start = i + 1;
                }
            }
            // pick up the last field
            readField(buf, start, len);

            Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
            mProtoTuple = null;
            System.out.println(t);
            return t;
        } catch (InterruptedException e) {
            int errCode = 6018;
            String errMsg = "Error while reading input";
            throw new ExecException(errMsg, errCode,
                    PigException.REMOTE_ENVIRONMENT, e);
        }

    }


    @Override
    public void setLocation(String string, Job job) throws IOException {
        FileInputFormat.setInputPaths(job,string);
    }

    @Override
    public InputFormat getInputFormat() throws IOException {
      return new TextInputFormat();
    }

    @Override
    public void prepareToRead(RecordReader reader, PigSplit ps) throws IOException {
        in=reader;
    }

}

下面是我的 Pig 脚本

REGISTER /home/hadoop/netbeans/sampleloader/dist/sampleloader.jar
a= load '/input.txt' using sampleloader.SimpleTextLoader();
store a into 'output';

最佳答案

您正在使用 sampleloader.SimpleTextLoader(),它不执行任何操作,因为它只是一个空的构造函数。
而是使用执行实际拆分操作的 sampleloader.SimpleTextLoader(String delimiter)

关于hadoop - Pig 中的 SimpleTextLoader UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27123625/

相关文章:

hadoop - 在Pig中处理数据结构

hadoop - 统计 GROUP BY 中 PIG 查询和 MySql 查询结果的差异

hadoop - Hadoop存档命令

hadoop - 重命名 PIG 输出的部分文件

java - Map ->Reduce ->Reduce(顺序调用两个reducer)-如何配置驱动程序

hadoop - Apache PIG-错误org.apache.pig.impl.PigContext-在第1行第1列遇到 “<OTHER> ”,= “”

hadoop - 执行使用UNION和RANK的PIG脚本。 (错误2017:内部错误创建作业配置。)

hadoop - 在数据仓库中处理大数据

hadoop - hadoop流-file选项以传递多个文件

java - YarnClient 运行时异常 : java. lang.NoClassDefFoundError: org/apache/hadoop/HadoopIllegalArgumentException