java - Hadoop reducer 接收到错误的数据

标签 java hadoop mapreduce hadoop-partitioning

我同时运行了很多 JobControls,它们都具有相同的一组 ControlledJobs。每个 JobControl 按日期范围处理一组不同的输入/输出文件,但它们都是类型。我观察到的问题是,reduce 步骤正在接收设计为由处理不同日期范围的 reducer 处理的数据。日期范围由 Job 设置,用于确定输入和输出,并从 reducer 中的上下文中读取。

如果我按顺序提交 JobControls,这将停止,但这并不好。这是我应该用自定义分区程序解决的问题吗?如果我不知道哪个 reducer 正在处理我当前的日期范围,我什至如何确定 key 的正确 reducer?为什么实例化的 reducer 不会锁定到它们的 JobControl?

我已经根据 Java 中的基本实现编写了所有的 JobControls、Jobs、Maps 和 Reduces。

我正在使用 2.0.3-alpha 和 yarn。这跟它有什么关系吗?

我在共享代码时必须小心谨慎,但这里有一个经过净化的映射器:

protected void map(LongWritable key, ProtobufWritable<Model> value, Context context) 
    throws IOException, InterruptedException {
  context.write(new Text(value.get().getSessionId()), 
                new ProtobufModelWritable(value.get()));
}

和 reducer :

protected void reduce(Text sessionId, Iterable<ProtobufModelWritable> models, Context context) 
     throws IOException, InterruptedException {
  Interval interval = getIntervalFromConfig(context);
  Model2 model2 = collapseModels(Iterables.transform(models, TO_MODEL));

  Preconditions.checkArgument(interval.contains(model2.getTimeStamp()), 
      "model2: " + model2 + " does not belong in " + interval);
}

private Interval getIntervalFromConfig(Context context) {
  String i = context.getConfiguration().get(INTERVAL_KEY);
  return Utils.interval(i);
}

最佳答案

作为引用,我用两件事解决了这个问题。最重要的问题是,当我为每个间隔创建单独的作业时,我给它们每个都起了相同的名字。通过将序列化间隔附加到作业名称,Hadoop 知道将映射结果发送到哪些缩减器。

此外,我开始为每个作业创建单独的配置对象,而不是复制初始配置。这可能是不必要的,但至少我知道我不会犯错并开始共享相同的配置对象。

关于java - Hadoop reducer 接收到错误的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15504258/

相关文章:

hadoop - mapreduce 作业未正确设置压缩编解码器

java - 在mapreduce程序中未调用reducer

java - 如何测试从数据库返回给定类型的所有实体的方法?

hadoop - 为初始化指定的未知版本 : 3. 1.0 schemaTool 失败

hadoop - Hadoop中的输入拆分

hadoop - 按 pig 中的相同值对数据包进行分组

java - Spring Data REST 防止 InvalidFormatException

java - 高度转换 - 厘米到英尺和英寸(反之亦然)

java - 如何退出 javafx 中的时间线?

hadoop - 有什么建议可以同时将两个不同的数据集读入 Hadoop?