我正在尝试创建一个map reduce程序来执行k-means算法。我知道使用map reduce并不是执行迭代算法的最佳方法。
我已经创建了mapper和reducer类。
在映射器代码中,我读取了一个输入文件。当 map 缩小完成后,我希望结果存储在相同的输入文件中。如何使输出文件覆盖从映射器输入的文件?
另外,我使映射减少迭代直到旧输入文件和新输入文件的值收敛,即值之间的差小于0.1
我的代码是:
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.FileReader;
import java.io.BufferedReader;
import java.util.ArrayList;
public class kmeansMapper extends Mapper<Object, Text, DoubleWritable,
DoubleWritable> {
private final static String centroidFile = "centroid.txt";
private List<Double> centers = new ArrayList<Double>();
public void setup(Context context) throws IOException{
BufferedReader br = new BufferedReader(new
FileReader(centroidFile));
String contentLine;
while((contentLine = br.readLine())!=null){
centers.add(Double.parseDouble(contentLine));
}
}
public void map(Object key, Text input, Context context) throws IOException,
InterruptedException {
String[] fields = input.toString().split(" ");
Double rating = Double.parseDouble(fields[2]);
Double distance = centers.get(0) - rating;
int position = 0;
for(int i=1; i<centers.size(); i++){
Double cDistance = Math.abs(centers.get(i) - rating);
if(cDistance< distance){
position = i;
distance = cDistance;
}
}
Double closestCenter = centers.get(position);
context.write(new DoubleWritable(closestCenter),new
DoubleWritable(rating)); //outputs closestcenter and rating value
}
}
import java.io.IOException;
import java.lang.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
import java.util.*;
public class kmeansReducer extends Reducer<DoubleWritable, DoubleWritable,
DoubleWritable, Text> {
public void reduce(DoubleWritable key, Iterable<DoubleWritable> values,
Context context)// get count // get total //get values in a string
throws IOException, InterruptedException {
Iterator<DoubleWritable> v = values.iterator();
double total = 0;
double count = 0;
String value = ""; //value is the rating
while (v.hasNext()){
double i = v.next().get();
value = value + " " + Double.toString(i);
total = total + i;
++count;
}
double nCenter = total/count;
context.write(new DoubleWritable(nCenter), new Text(value));
}
}
import java.util.Arrays;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class run
{
public static void runJob(String[] input, String output) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
Path toCache = new Path("input/centroid.txt");
job.addCacheFile(toCache.toUri());
job.setJarByClass(run.class);
job.setMapperClass(kmeansMapper.class);
job.setReducerClass(kmeansReducer.class);
job.setMapOutputKeyClass(DoubleWritable.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setNumReduceTasks(1);
Path outputPath = new Path(output);
FileInputFormat.setInputPaths(job, StringUtils.join(input, ","));
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath,true);
job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception {
runJob(Arrays.copyOfRange(args, 0, args.length-1), args[args.length-1]);
}
}
谢谢
最佳答案
我知道您放了免责声明..但是请切换到Spark或其他可以解决内存中问题的框架。你的生活会好很多。
如果确实要执行此操作,则只需迭代运行runJob中的代码,然后使用临时文件名进行输入。您可以看到this question on moving files in hadoop实现此目的。您需要输入一个FileSystem实例和一个临时文件:
FileSystem fs = FileSystem.get(new Configuration());
Path tempInputPath = Paths.get('/user/th/kmeans/tmp_input';
概括地说,在每次迭代完成后,执行
fs.delete(tempInputPath)
fs.rename(outputPath, tempInputPath)
当然,对于第一次迭代,您必须将输入路径设置为运行作业时提供的输入路径。后续迭代可以使用tempInputPath,这将是先前迭代的输出。
关于algorithm - Hadoop Mapreduce,如何使用map reduce输出重写在映射器中输入的txt文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47686966/