我同时运行了很多 JobControls,它们都具有相同的一组 ControlledJobs。每个 JobControl 按日期范围处理一组不同的输入/输出文件,但它们都是类型。我观察到的问题是,reduce 步骤正在接收设计为由处理不同日期范围的 reducer 处理的数据。日期范围由 Job 设置,用于确定输入和输出,并从 reducer 中的上下文中读取。
如果我按顺序提交 JobControls,这将停止,但这并不好。这是我应该用自定义分区程序解决的问题吗?如果我不知道哪个 reducer 正在处理我当前的日期范围,我什至如何确定 key 的正确 reducer?为什么实例化的 reducer 不会锁定到它们的 JobControl?
我已经根据 Java 中的基本实现编写了所有的 JobControls、Jobs、Maps 和 Reduces。
我正在使用 2.0.3-alpha 和 yarn。这跟它有什么关系吗?
我在共享代码时必须小心谨慎,但这里有一个经过净化的映射器:
protected void map(LongWritable key, ProtobufWritable<Model> value, Context context)
throws IOException, InterruptedException {
context.write(new Text(value.get().getSessionId()),
new ProtobufModelWritable(value.get()));
}
和 reducer :
protected void reduce(Text sessionId, Iterable<ProtobufModelWritable> models, Context context)
throws IOException, InterruptedException {
Interval interval = getIntervalFromConfig(context);
Model2 model2 = collapseModels(Iterables.transform(models, TO_MODEL));
Preconditions.checkArgument(interval.contains(model2.getTimeStamp()),
"model2: " + model2 + " does not belong in " + interval);
}
private Interval getIntervalFromConfig(Context context) {
String i = context.getConfiguration().get(INTERVAL_KEY);
return Utils.interval(i);
}
最佳答案
作为引用,我用两件事解决了这个问题。最重要的问题是,当我为每个间隔创建单独的作业时,我给它们每个都起了相同的名字。通过将序列化间隔附加到作业名称,Hadoop 知道将映射结果发送到哪些缩减器。
此外,我开始为每个作业创建单独的配置对象,而不是复制初始配置。这可能是不必要的,但至少我知道我不会犯错并开始共享相同的配置对象。
关于java - Hadoop reducer 接收到错误的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15504258/