java - Hadoop MapReduce 查询大型 json 数据

标签 java json hadoop

此处为 Hadoop n00b。

我在服务器上安装了 Hadoop 2.6.0,我在其中存储了 12 个 json 文件,我想对其执行 MapReduce 操作。这些文件很大,每个文件有 2-5 GB 不等。

JSON 文件的结构是一个 JSON 对象数组。以下两个对象的片段:

[{"campus":"Gløshaugen","building":"Varmeteknisk og Kjelhuset","floor":"4. etasje","timestamp":1412121618,"dayOfWeek":3,"hourOfDay":2,"latitude":63.419161638078066,"salt_timestamp":1412121602,"longitude":10.404867443910122,"id":"961","accuracy":56.083199914753536},{"campus":"Gløshaugen","building":"IT-Vest","floor":"2. etasje","timestamp":1412121612,"dayOfWeek":3,"hourOfDay":2,"latitude":63.41709424828986,"salt_timestamp":1412121602,"longitude":10.402167488838765,"id":"982","accuracy":7.315199988880896}]

我想根据字段buildingtimestamp 执行MapReduce 操作。至少在一开始,直到我掌握了这个窍门。例如。 mapReduce构建等于一个参数,timestamp大于X小于Y的数据,reduce处理后我需要的相关字段是纬度和经度。

我知道有不同的工具(Hive、HBase、PIG、Spark 等)可以与 Hadoop 一起使用,可能更容易解决这个问题,但我的老板想要评估独立 Hadoop 的 MapReduce 性能。

到目前为止,我已经创建了触发 map 和 reduce 类的主类,实现了我认为是 map 类的开始,但我仍然停留在 reduce 类上。以下是我目前所拥有的。

public class Hadoop {

    public static void main(String[] args) throws Exception {

        try {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "maze");
            job.setJarByClass(Hadoop.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            job.setInputFormatClass(KeyValueTextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            Path inPath = new Path("hdfs://xxx.xxx.106.23:50070/data.json");

            FileInputFormat.addInputPath(job, inPath);

            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

映射器:

public class Map extends org.apache.hadoop.mapreduce.Mapper{
    private Text word = new Text();

    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {

        try {
            JSONObject jo = new JSONObject(value.toString());
            String latitude = jo.getString("latitude");
            String longitude = jo.getString("longitude");
            long timestamp = jo.getLong("timestamp");
            String building = jo.getString("building");
            StringBuilder sb = new StringBuilder();

            sb.append(latitude);
            sb.append("/");
            sb.append(longitude);
            sb.append("/");
            sb.append(timestamp);
            sb.append("/");
            sb.append(building);
            sb.append("/");
            context.write(new Text(sb.toString()),value);

        }catch (JSONException e){
            e.printStackTrace();
        }
    }
}

reducer :

public class Reducer extends org.apache.hadoop.mapreduce.Reducer{
    private Text result = new Text();

    protected void reduce(Text key, Iterable<Text> values, org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException {


    }
}

更新

public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
    private static String BUILDING;
    private static int tsFrom;
    private static int tsTo;
    try {
        JSONArray ja = new JSONArray(key.toString());
        StringBuilder sb;
        for(int n = 0; n < ja.length(); n++)
        {
            JSONObject jo = ja.getJSONObject(n);
            String latitude = jo.getString("latitude");
            String longitude = jo.getString("longitude");
            int timestamp = jo.getInt("timestamp");
            String building = jo.getString("building");



            if (BUILDING.equals(building) && timestamp < tsTo && timestamp > tsFrom) {
                sb = new StringBuilder();
                sb.append(latitude);
                sb.append("/");
                sb.append(longitude);
                context.write(new Text(sb.toString()), value);
            }
        }
    }catch (JSONException e){
        e.printStackTrace();
    }
}

@Override
public void configure(JobConf jobConf) {
    System.out.println("configure");
    BUILDING = jobConf.get("BUILDING");
    tsFrom = Integer.parseInt(jobConf.get("TSFROM"));
    tsTo = Integer.parseInt(jobConf.get("TSTO"));
}

这适用于小型数据集。由于我正在处理大型 json 文件,因此出现 Java 堆空间异常。由于我不熟悉 Hadoop,我很难理解 MapR 如何在不出现 outOfMemoryError 的情况下读取数据。

最佳答案

如果您只是想要一个在 building=something 和 timestamp=somethingelse 约束下的 LONG/LAT 列表。

这是一个简单的过滤操作;为此,您不需要 reducer 。在映射器中,您应该检查当前 JSON 是否满足条件,然后才将其写入上下文。如果它不满足您不希望在输出中出现的条件。

输出应该是 LONG/LAT(没有建筑/时间戳,除非你也想要它们)

如果不存在 reducer,映射器的输出就是作业的输出,这在您的情况下就足够了。

至于代码:

您的驱动程序应使用作业配置将建筑物 ID 和时间戳范围传递给映射器。您放在那里的任何内容都将可供您的所有映射器使用。

Configuration conf = new Configuration();
conf.set("Building", "123");
conf.set("TSFROM", "12300000000");
conf.set("TSTO", "12400000000");
Job job = new Job(conf);

您的映射器类需要实现 JobConfigurable.configure;在那里你将从配置对象读取到局部静态变量

private static String BUILDING;
private static Long tsFrom;
private static Long tsTo;
public void configure(JobConf job) {
    BUILDING = job.get("Building");
    tsFrom = Long.parseLong(job.get("TSFROM"));
    tsTo = Long.parseLong(job.get("TSTO"));
}

现在,您的 map 函数需要检查:

if (BUILDING.equals(building) && timestamp < TSTO && timestamp > TSFROM) {
   sb = new StringBuilder();
   sb.append(latitude);
   sb.append("/");
   sb.append(longitude);
   context.write(new Text(sb.toString()),1);
}

这意味着属于其他建筑物或时间戳之外的任何行都不会出现在结果中。

关于java - Hadoop MapReduce 查询大型 json 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35989792/

相关文章:

java - 如何在 JavaFX 中创建对话框

java - Java中的并发问题

java - 在属性文件中转义 json 字符串

jquery - 从 for 循环中检索值并将它们动态添加到数组中的 JSON

hadoop - 通过 REST API 向外部提交应用程序

java - 提交前的 SQL 外键

java - 关于Java堆空间字符串对象存储的查询

json - swift:无法使用存储的属性 'tableView' 覆盖

hadoop - Hive如何存储数据,什么是SerDe?

hadoop - 如何在 Hadoop 程序中处理大量映射器的键?