java - 当我在 Reducer 中读取它们时,Mapper 中发送的文本/字符串值是错误的

标签 java hadoop mapreduce reduce-reduce-conflict

我正在 Mapper 中发送一些数据,当我尝试在 Reducer 中读取它们时,它们发生了一些变化。在简历中,我使用 set 函数填充数据,然后使用 get 函数在 reducer 中读取它们。我不明白为什么如果我执行 println,数据会不同。

我发送的数据在一个名为“ValorFechaHora”的类中,有 3 个变量 Medicion、Fecha 和 Hora:

public class ValorFechaHora implements Writable {

	private IntWritable Medicion;
	private Text Fecha;
	private Text Hora;
	
	public void ValorFechaHora(){
		
	}
	
	public void ValorFechaHora(IntWritable Medicion, Text Fecha, Text Hora){
		setMedicion(Medicion);
		setFecha(Fecha);
		setHora(Hora);
	}
	
	public IntWritable getMedicion() {
		return Medicion;
	}

	public void setMedicion(IntWritable medicion) {
		Medicion = medicion;
	}

	public Text getFecha() {
		return Fecha;
	}

	public void setFecha(Text fecha) {
		Fecha = fecha;
	}

	public Text getHora() {
		return Hora;
	}

	public void setHora(Text hora) {
		Hora = hora;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((Fecha == null) ? 0 : Fecha.hashCode());
		result = prime * result + ((Hora == null) ? 0 : Hora.hashCode());
		result = prime * result
				+ ((Medicion == null) ? 0 : Medicion.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		ValorFechaHora other = (ValorFechaHora) obj;
		if (Fecha == null) {
			if (other.Fecha != null)
				return false;
		} else if (!Fecha.equals(other.Fecha))
			return false;
		if (Hora == null) {
			if (other.Hora != null)
				return false;
		} else if (!Hora.equals(other.Hora))
			return false;
		if (Medicion == null) {
			if (other.Medicion != null)
				return false;
		} else if (!Medicion.equals(other.Medicion))
			return false;
		return true;
	}

	public void readFields(DataInput in) throws IOException {
		
		Medicion = new IntWritable(in.readInt());
		Fecha = new Text(in.readLine());
		Hora = new Text(in.readLine());
	}
	
	public void write(DataOutput out) throws IOException {
		Medicion.write(out);
		Fecha.write(out);
		Hora.write(out);
	}	

}

在这里你可以看到我的映射器:

public static class LogsMapper extends
			Mapper<LongWritable, Text, Text, ValorFechaHora> {

		//En el mapper emitimos lo que leemos. Key = Dirección MAC. Value = Medición + Fecha + Hora
		
		private Text outKey = new Text();
		private ValorFechaHora outValue = new ValorFechaHora();
		
		@Override
		protected void map(LongWritable offset, Text line, Context context)
				throws IOException, InterruptedException {
			
			
			// Utilizamos row_auxiliar y row para leer los datos correctos (El offset no nos interesa)
			// Ejemplo de dato de entrada tras salir del filtrado básico "2536816	-47dB;8C:3A:E3:92:CB:3E;2014-11-12;14:22:20.795806"
			
			String row_auxiliar[] = line.toString().split("\t");
			String row[] = row_auxiliar[1].split(";");
			
			// Los datos en row quedan... ---> row[0]= Medicion row[1]= MAC row[2]= Fecha row[3]= Hora
	
			//Elegimos la MAC como key
			outKey = new Text(row[1]);
			
			//Elegimos la Medicion, Fecha y Hora como value
			outValue.setMedicion(new IntWritable(Integer.valueOf(row[0].substring(0,3))));
			outValue.setFecha(new Text(row[2]));
			outValue.setHora(new Text(row[3]));
			
			context.write(outKey, outValue);
				
		};

这是我的 reducer :

public static class MaxReducer extends
			Reducer<Text, ValorFechaHora, Text, Text> {
		
		//En el reduce por ahora únicamente contamos el número de veces que ha sido la MAC registrada
		
		protected void reduce(Text MAC,
				Iterable<ValorFechaHora> values, Context context)
				throws IOException, InterruptedException {

			Text outKey = new Text();
			Text outValue = new Text();
			
			outKey = MAC;
			int sum = 0;
			
			for(ValorFechaHora val : values){
				System.out.println("1" + " " + val.getMedicion().toString());
				System.out.println("2" + " " + val.getFecha().toString());
				System.out.println("3" + " " + val.getHora().toString());
				
				sum = sum +1;
			}
			
			outValue = new Text(Integer.toString(sum));
			
			context.write(outKey, outValue);
		};

嗯,我不明白为什么当我在做 bucle 时 reducer 中的变量 val.getFecha().toString() 与 outKey.getFecha().toString 中的变量不同映射器

蒂亚

最佳答案

您使用了错误的方法调用来填充文本对象。您应该使用 Text 对象的 readFields 方法。

目前您正在尝试通过构造函数填充 Text 对象,该构造函数接受 String 作为其参数。您不能只使用 in.readLine 从 DataInput 对象读回一个 String,因为 Text 对象在没有终止换行符的情况下被序列化到数据流。

要解决这个问题,您应该重新使用您应该初始化您的变量,然后只使用 readFields 方法(这可能会在您的代码中产生其他链式 react ,因为您当前没有使用对象重用模式(这比为每个 K/V 对象创建新对象更有效):

private IntWritable Medicion = new IntWritable();
private Text Fecha = new Text();
private Text Hora = new Text();

public void readFields(DataInput in) {
    Medicion.readFields(in);
    Fecha.readFields(in);
    Hora.readFields(in);
}

否则,要保持您的代码不变(但效率较低),只需按如下方式更新 readFields 方法:

public void readFields(DataInput in) {
    Medicion = new Text();
    Medicion.readFields(in);

    Fecha = new Text();
    Fecha.readFields(in);

    Hora = new Text();
    Hora.readFields(in);
}

关于java - 当我在 Reducer 中读取它们时,Mapper 中发送的文本/字符串值是错误的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27845859/

相关文章:

java - 解析直播广播的 RTSP 流

hadoop - 如何在Hive中创建随机分区的数据

hadoop - 使用LoadIncrementalHFiles和子目录进行批量加载

hadoop - 按 Pig Latin 中的最后一列过滤 - 当未指定架构时

hadoop - 运行 hadoop wordcount 示例失败

hadoop - 如何使用Hadoop MapReduce将数据从AWS S3导入HDFS

java - Hibernate Dao 服务而不是repository.JpaRepository

java - Canvas 上文本的持续时间

java - 有人有处理 Nuance 的 OmniPage SDK 的 XML 格式的经验吗?

apache - MR1 和 MR2 有什么区别?