hadoop - PIG UDF 处理拆分成不同映射器的多行元组

标签 hadoop amazon-web-services mapreduce user-defined-functions apache-pig

我有一个文件,其中每个元组跨越多行,例如:

START
name: Jim
phone: 2128789283
address: 56 2nd street, New York, USA
END
START
name: Tom
phone: 6308789283
address: 56 5th street, Chicago, 13611, USA
END
.
.
.

以上是我文件中的 2 个元组。我编写了定义 getNext() 函数的 UDF,该函数检查它是否开始,然后我将初始化我的元组;如果是 END 那么我将返回元组(来自字符串缓冲区);否则我只会将字符串添加到字符串缓冲区。

它适用于小于 HDFS block 大小 64 MB(在 Amazon EMR 上)的文件大小,而对于大于此大小的文件,它将失败。我尝试用谷歌搜索,找到这个 blog post . Raja 的解释很容易理解,他提供了示例代码。但是代码正在实现 RecordReader 部分,而不是 pig LoadFuncgetNext()。只是想知道是否有人有处理多行 pig 元组拆分问题的经验?我应该继续在 Pig 中实现 RecordReader 吗?如果是,怎么办?

谢谢。

最佳答案

您可以像 Guy 提到的那样预处理您的输入,或者可以应用描述的其他技巧 here .

我认为最干净的解决方案是实现自定义 InputFormat (与其 RecordReader 一起)创建一条记录/START-END。 pig 的LoadFunc位于 Hadoop 的 InputFormat 的顶部,因此您可以定义 LoadFunc 将使用的 InputFormat。
自定义 LoadFunc 的原始骨架实现如下所示:

import java.io.IOException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

public class CustomLoader extends LoadFunc {

    private RecordReader reader;
    private TupleFactory tupleFactory;

    public CustomLoader() {
        tupleFactory = TupleFactory.getInstance();
    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        return new MyInputFormat(); //custom InputFormat
    }

    @Override
    public Tuple getNext() {
        Tuple result = null;
        try {
            if (!reader.nextKeyValue()) {
                return null;
            }
            //value can be a custom Writable containing your name/value 
            //field pairs for a given record
            Object value = reader.getCurrentValue();
            result = tupleFactory.newTuple();
            // ...
            //append fields to tuple
        }
        catch (Exception e) {
            // ...
        }
        return result;
    }

    @Override
    public void prepareToRead(RecordReader reader, PigSplit pigSplit) 
      throws IOException {
        this.reader = reader;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        FileInputFormat.setInputPaths(job, location);
    }
}

LoadFunc 初始化InputFormat 及其RecordReader 之后,它定位到您的数据的输入位置并开始从recordReader 获取记录,创建生成的元组 (getNext()),直到输入被完全读取。

关于自定义 InputFormat 的一些说明:

我会创建一个自定义的 InputFormat,其中 RecordReader 是 org.apache.hadoop.mapreduce.lib.input.LineRecordReader:大多数方法会 保持不变,除了 initialize():它会调用一个自定义的 LineReader (基于 org.apache.hadoop.util.LineReader)。 InputFormat 的键是行偏移量(长整型),值是 自定义 可写。这会将记录的字段(即 START-END 之间的数据)保存为键值对列表。每次调用 RecordReader 的 nextKeyValue() 时,LineReader 都会将记录写入自定义 Writable。整个事情的要点是你如何 实现 LineReader.readLine()

另一个可能更简单的方法是更改​​ TextInputFormat 的分隔符(它在 Hadoop 0.23 中是可配置的,参见 textinputformat.record.delimiter) 适合您的数据结构的一个(如果可能的话)。在这种情况下,您最终会将数据保存在 Text 中,您需要从中拆分和提取 KV 对并放入元组中。

关于hadoop - PIG UDF 处理拆分成不同映射器的多行元组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13871134/

相关文章:

javascript - 在将文件上传到 S3 之前重命名文件

amazon-web-services - 将一个 aws 帐户的配置复制到另一个

amazon-web-services - 在 AWS 上测试 Lambda 函数时遇到问题

hadoop - 相当于FIRST和LAST的HIVE

ubuntu上hadoop单节点集群安装

Hadoop MapReduce vs MPI(vs Spark vs Mahout vs Mesos)——什么时候使用其中一个?

java - map 的输出将一个键的值合在一起

java - 在 Hadoop DistributedCache 上存储 TreeSet

sql - 根据日期计算某行最大值与其他行的差值

hadoop - Amazon Web Services Map-Reduce 错误 Illegal character in path at index