hadoop - 使用 Pig 加载 Json 数据

标签 hadoop mapreduce apache-pig bigdata cloudera

我正在尝试使用 jsonLoader() 从下面提到的 json 格式中提取数据:

{"Partition":"10","Key":"618897","Properties2":[{"K":"A","T":"String","V":"M "}, {"K":"B","T":"String","V":"N"}, {"K":"D","T":"String","V":"O"}]}
{"Partition":"11","Key":"618900","Properties2":[{"K":"A","T":"String","V":"W”"},{"K":"B","T":"String","V":"X"}, {"K":"C","T":"String","V":"Y"},{"K":"D","T":"String","V":"Z"}]}

现在我可以使用以下代码从每个数组对象的“partition”、“key”和“V”中提取数据:

A= LOAD '/home/hduser/abc.jon' Using JsonLoader('Partition:chararray,Key:chararray,Properties2:{(K:chararray,T:chararray,V:chararray)},Timestamp:chararray');
B= foreach A generate $0,$1,BagToString(Properties2.V,'\t') as vl:chararray; 
store B into './Result/outPut2';

从上面的代码中,我在序列基础上而不是在列基础上得到“Properties2”数组值,每当序列改变或新对象出现时它就会产生问题。 请帮我根据列(K值)提取数据。

我的输出 enter image description here

预期输出 enter image description here

提前致谢

最佳答案

这里有两个选择

1.使用elephant-bird这将为您提供键和值的映射。

A = LOAD '/apps/pig/json_sample' USING com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad') as (json:map[]);
B = FOREACH A GENERATE json#'Partition',json#'Key',json#'Properties2';
dump B;

会给你一个输出:

(10,618897,{([T#String,K#A,V#M ]),([T#String,K#B,V#N]),([T#String,K#D,V#O])})
(11,618900,{([T#String,K#A,V#W”]),([T#String,K#B,V#X]),([T#String,K#C,V#Y]),([T#String,K#D,V#Z])})

或者您必须编写一个必须执行此操作的自定义加载程序

a). 它应该知道即将到来的值的正确顺序是什么 对于键 K

b)。检查每个值,查看 json 是否缺少任何此键,并为该位置返回一个空/空字符。

我正在发布 CustomJsonLoader 的 getNext() 方法,它将执行相同的操作:

@Override
public Tuple getNext() throws IOException {
    // TODO Auto-generated method stub
    try {
        boolean notDone = in.nextKeyValue();
        if (!notDone) {
            return null;
        }
        Text value = (Text) in.getCurrentValue();
        List<String> valueList = new ArrayList<String>();
        if (value != null) {

            String jsonString = value.toString();
            System.out.println(jsonString);
            JSONParser parser = new JSONParser();
            JSONObject obj = null;
            try {
                obj = (JSONObject) parser.parse(jsonString);
            } catch (ParseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("obj is "+obj);
            if (obj != null) {
                String partition = (String) obj.get("Partition");
                String key = (String) obj.get("Key");
                valueList.add(partition);
                valueList.add(key);
                JSONArray innArr = (JSONArray) obj.get("Properties2");
                char[] innKeys = new char[] { 'A', 'B', 'C', 'D' };
                Map<String,String> keyMap = new HashMap<String,String>();
                for (Object innObj : innArr) {
                    JSONObject jsonObj = (JSONObject) innObj;
                    keyMap.put(jsonObj.get("K")+"",jsonObj.get("V")+"");
                }
                for (int i = 0; i < innKeys.length; i++) {
                    char ch = innKeys[i];
                    if (keyMap.containsKey(ch+"")) {
                        valueList.add(keyMap.get(ch+""));
                    }else{
                        valueList.add("");
                    }

                }
                Tuple t = tupleFactory.newTuple(valueList);
                return t;
            }
        }

        return null;
    } catch (InterruptedException e) {
    }
}

注册并运行:

REGISTER udf/CustomJsonLoader.jar
A = LOAD '/apps/pig/json_sample' USING CustomJsonLoader();
DUMP A;
(10,618897,M,N,,O)
(11,618900,W,X,Y,Z)

希望这对您有所帮助!

关于hadoop - 使用 Pig 加载 Json 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28367095/

相关文章:

java - 使用 hadoop mapreduce 识别文件夹中的文件扩展名以处理文件

hadoop - Windows 上的 HIVE 安装

apache-spark - 将2TB的压缩多行JSON转换为ND JSON

hadoop - Hadoop Map Reduce 和 Google Map Reduce 之间的区别

hadoop - Pig 10.0 - 将元组分组并合并到 foreach 中

java - 矩阵XX'的MapReduce代码

hadoop - Hadoop:在同一数据或ChainMap上运行两个M/R作业,但存在同步障碍

scala - 如何在 spark-scala 中将 Iterable[String] 保存到 hdfs

hadoop - 在 hive 中按 id 收集数据

hadoop - 是否可以以将多行作为单个输入元组处理的方式使用 Pig 流式处理 (StreamToPig)?