我正在尝试逐行读取hdfs文件,然后创建hdfs文件并逐行写入。我使用的代码如下所示:
Path FileToRead=new Path(inputPath);
FileSystem hdfs = FileToRead.getFileSystem(new Configuration());
FSDataInputStream fis = hdfs.open(FileToRead);
BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
String line;
line = reader.readLine();
while (line != null){
String[] lineElem = line.split(",");
for(int i=0;i<10;i++){
MyMatrix[i][Integer.valueOf(lineElem[0])-1] = Double.valueOf(lineElem[i+1]);
}
line=reader.readLine();
}
reader.close();
fis.close();
Path FileToWrite = new Path(outputPath+"/V");
FileSystem fs = FileSystem.get(new Configuration());
FSDataOutputStream fileOut = fs.create(FileToWrite);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOut));
writer.write("check");
writer.close();
fileOut.close();
在我的outputPath文件中运行此代码时,尚未创建V。但是,如果我将读取部分替换为写入部分,则会创建文件并检查是否写入其中。
谁能帮助我了解如何正确使用它们,以便能够先读取整个文件,然后逐行写入文件?
我也尝试了另一种代码来读取一个文件并写入另一个文件,但是该文件将被创建,但是没有写入任何内容!
我用这样的东西:
hadoop jar main.jar program2.Main input output
然后在我的第一份工作中,我从arg [0]中读取并使用map reduce类将其写入args [1] +“/ NewV”中的文件,并且可以正常工作。
在另一类(非映射归约)中,我使用args [1] +“/ NewV”作为输入路径,并使用output +“/ V_0”作为输出路径(我将这些字符串传递给构造函数)。这是该类的代码:
public class Init_V {
String inputPath, outputPath;
public Init_V(String inputPath, String outputPath) throws Exception {
this.inputPath = inputPath;
this.outputPath = outputPath;
try{
FileSystem fs = FileSystem.get(new Configuration());
Path FileToWrite = new Path(outputPath+"/V.txt");
Path FileToRead=new Path(inputPath);
BufferedWriter output = new BufferedWriter
(new OutputStreamWriter(fs.create(FileToWrite,
true)));
BufferedReader reader = new
BufferedReader(new InputStreamReader(fs.open(FileToRead)));
String data;
data = reader.readLine();
while ( data != null )
{
output.write(data);
data = reader.readLine();
}
reader.close();
output.close(); }catch(Exception e){
}
}
}
最佳答案
我认为,您需要了解hadoop如何正常工作。在hadoop中,许多事情都是由系统完成的,您只是提供输入和输出路径,然后如果路径有效,则由hadoop打开和创建它们。检查以下示例;
public int run (String[] args) throws Exception{
if(args.length != 3){
System.err.println("Usage: MapReduce <input path> <output path> ");
ToolRunner.printGenericCommandUsage(System.err);
}
Job job = new Job();
job.setJarByClass(MyClass.class);
job.setNumReduceTasks(5);
job.setJobName("myclass");
FileInputFormat.addInputPath(job, new Path(args[0]) );
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0:1 ;
}
/* ----------------------main---------------------*/
public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(new MyClass(), args);
System.exit(exitCode);
}
如您在此处看到的,您仅初始化必要的变量,而读写则由hadoop完成。
另外,在 Mapper类中,您在 map 内说
context.write(key, value)
,类似地,在 Reduce类中,您也在做同样的事情,它会为您编写。如果使用BufferedWriter / Reader,它将写入本地文件系统而不是HDFS。要查看HDFS中的文件,应编写
hadoop fs -ls <path>
,通过ls
命令查找的文件位于本地文件系统中编辑:为了使用读/写,您应该了解以下内容:假设您的hadoop网络中有N台计算机。当您要阅读时,您将不知道正在读取哪个映射器,类似于书写。因此,所有的映射器和化简器都应该具有那些路径,以免产生异常。
我不知道是否可以使用任何其他类,但是由于特定的原因,您可以使用两种方法:
startup
和cleanup
。这些方法在每个映射中仅使用一次,并减少工作量。因此,如果您想读写,可以使用该文件。读写与普通的Java代码相同。例如,您想为每个键看到一些内容,并想将其写入txt。您可以执行以下操作://in reducer
BufferedReader bw ..;
void startup(...){
bw = new ....;
}
void reduce(...){
while(iter.hasNext()){ ....;
}
bw.write(key, ...);
}
void cleanup(...){
bw.close();
}
关于hadoop - BufferedReader和Bufferedwriter用于读取和写入HDFS文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16510672/