algorithm - Hadoop Mapreduce,如何使用map reduce输出重写在映射器中输入的txt文件?

标签 algorithm hadoop mapreduce bigdata k-means

我正在尝试创建一个map reduce程序来执行k-means算法。我知道使用map reduce并不是执行迭代算法的最佳方法。
我已经创建了mapper和reducer类。
在映射器代码中,我读取了一个输入文件。当 map 缩小完成后,我希望结果存储在相同的输入文件中。如何使输出文件覆盖从映射器输入的文件?
另外,我使映射减少迭代直到旧输入文件和新输入文件的值收敛,即值之间的差小于0.1

我的代码是:

 import java.io.IOException;
 import java.util.StringTokenizer;
 import java.util.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapreduce.Mapper;
 import java.io.FileReader;
 import java.io.BufferedReader;
 import java.util.ArrayList;


public class kmeansMapper extends Mapper<Object, Text, DoubleWritable, 
DoubleWritable> {
private final static String centroidFile = "centroid.txt";
private List<Double> centers = new ArrayList<Double>();

public void setup(Context context) throws IOException{
        BufferedReader br = new BufferedReader(new 
        FileReader(centroidFile));
        String contentLine;
        while((contentLine = br.readLine())!=null){
            centers.add(Double.parseDouble(contentLine));
        }
}

public void map(Object key, Text input, Context context) throws IOException, 
InterruptedException {

        String[] fields = input.toString().split("  ");
        Double rating = Double.parseDouble(fields[2]);
        Double distance = centers.get(0) - rating;
        int position = 0;
        for(int i=1; i<centers.size(); i++){
            Double cDistance = Math.abs(centers.get(i) - rating);
            if(cDistance< distance){
                position = i;
                distance = cDistance;
            }
        }
        Double closestCenter = centers.get(position);
        context.write(new DoubleWritable(closestCenter),new 
DoubleWritable(rating)); //outputs closestcenter and rating value

        }
}
import java.io.IOException;
import java.lang.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
import java.util.*;

public class kmeansReducer extends Reducer<DoubleWritable, DoubleWritable, 
DoubleWritable, Text> {

public void reduce(DoubleWritable key, Iterable<DoubleWritable> values, 
Context context)// get count // get total //get values in a string
          throws IOException, InterruptedException {
            Iterator<DoubleWritable> v = values.iterator();
            double total = 0;
            double count = 0;
            String value = ""; //value is the rating
            while (v.hasNext()){
              double i = v.next().get();
              value = value + " " + Double.toString(i);
              total = total + i;
              ++count;
            }
            double nCenter = total/count;
  context.write(new DoubleWritable(nCenter), new Text(value));
}
}
import java.util.Arrays;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class run
{

 public static void runJob(String[] input, String output) throws Exception {

    Configuration conf = new Configuration();

  Job job = new Job(conf);
  Path toCache = new Path("input/centroid.txt"); 
  job.addCacheFile(toCache.toUri());
  job.setJarByClass(run.class);
  job.setMapperClass(kmeansMapper.class);
  job.setReducerClass(kmeansReducer.class);
  job.setMapOutputKeyClass(DoubleWritable.class);
  job.setMapOutputValueClass(DoubleWritable.class);

  job.setNumReduceTasks(1);
  Path outputPath = new Path(output);
  FileInputFormat.setInputPaths(job, StringUtils.join(input, ","));
  FileOutputFormat.setOutputPath(job, outputPath);
  outputPath.getFileSystem(conf).delete(outputPath,true);
  job.waitForCompletion(true);

}

public static void main(String[] args) throws Exception {
   runJob(Arrays.copyOfRange(args, 0, args.length-1), args[args.length-1]);

}

}

谢谢

最佳答案

我知道您放了免责声明..但是请切换到Spark或其他可以解决内存中问题的框架。你的生活会好很多。

如果确实要执行此操作,则只需迭代运行runJob中的代码,然后使用临时文件名进行输入。您可以看到this question on moving files in hadoop实现此目的。您需要输入一个FileSystem实例和一个临时文件:

FileSystem fs = FileSystem.get(new Configuration());
Path tempInputPath = Paths.get('/user/th/kmeans/tmp_input';

概括地说,在每次迭代完成后,执行
fs.delete(tempInputPath)
fs.rename(outputPath, tempInputPath)

当然,对于第一次迭代,您必须将输入路径设置为运行作业时提供的输入路径。后续迭代可以使用tempInputPath,这将是先前迭代的输出。

关于algorithm - Hadoop Mapreduce,如何使用map reduce输出重写在映射器中输入的txt文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47686966/

相关文章:

hadoop - 在hadoop reducer中检索全局参数返回null

python - Hadoop mapreduce 任务失败并显示 143

java - 消息 : Invalid byte 1 of 1-byte UTF-8 sequence in hadoop

hadoop - Hadoop作业配置

algorithm - BFS 算法 Introduction to algorithms book by cormen,leiserson etal

algorithm - ∀ y ∈ R+, ∃ z ∈ R, e^z = y 用伪代码怎么写?

java - 如何在hadoop WordCount应用程序中获取所有标记词的文件名?

hadoop - Pig/MapReduce工作分析概念需要了解

java - 在 3 维空间中移动所有点而不存储所有可能的坐标

运行费时但易于验证的算法?