hadoop - 在 Hadoop 0.20 中使用分布式缓存进行复制连接

标签 hadoop distributed-caching

我一直在尝试在集群和 karmasphere 接口(interface)上使用分布式缓存进行复制连接。我在下面粘贴了代码。我的程序在缓存中找不到文件

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Hashtable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

// A demostration of Hadoop's DistributedCache tool
// 

public class MapperSideJoinWithDistributedCache extends Configured implements Tool {
        private  final  static  String inputa =  "C:/Users/LopezGG/workspace/Second_join/input1_1" ; 
public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> {

  private Hashtable<String, String> joinData = new Hashtable<String, String>();

  @Override
  public void configure(JobConf conf) {
    try {
      Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
          System.out.println("ds"+DistributedCache.getLocalCacheFiles(conf));
      if (cacheFiles != null && cacheFiles.length > 0) {
        String line;
        String[] tokens;
        BufferedReader joinReader = new BufferedReader(new FileReader(cacheFiles[0].toString()));

        try {
          while ((line = joinReader.readLine()) != null) {
          tokens = line.split(",", 2);
          joinData.put(tokens[0], tokens[1]);
        }
        } finally {
          joinReader.close();
        }
      }
      else
          System.out.println("joinreader not set" );
    } catch(IOException e) {
      System.err.println("Exception reading DistributedCache: " + e);
    }
  }

  public void map(Text key, Text value, OutputCollector<Text, Text> output,  Reporter reporter) throws IOException {
    String joinValue = joinData.get(key.toString());
    if (joinValue != null) {
    output.collect(key,new Text(value.toString() + "," + joinValue));
    }
  }
}


public int run(String[] args) throws Exception {
  Configuration conf = getConf();
  JobConf job = new JobConf(conf, MapperSideJoinWithDistributedCache.class);

  DistributedCache.addCacheFile(new Path(args[0]).toUri(), job); 
  //System.out.println( DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf));
    Path in = new Path(args[1]);
  Path out = new Path(args[2]);
  FileInputFormat.setInputPaths(job, in);
  FileOutputFormat.setOutputPath(job, out);
  job.setJobName("DataJoin with DistributedCache");
  job.setMapperClass(MapClass.class);
  job.setNumReduceTasks(0);
  job.setInputFormat( KeyValueTextInputFormat.class);
  job.setOutputFormat(TextOutputFormat.class);
  job.set("key.value.separator.in.input.line", ",");
  JobClient.runJob(job);
  return 0;
}

  public static void main(String[] args) throws Exception {
                long time1= System.currentTimeMillis();
                System.out.println(time1);
      int res = ToolRunner.run(new Configuration(),
      new MapperSideJoinWithDistributedCache(),args);
          long time2= System.currentTimeMillis(); 
          System.out.println(time2);
          System.out.println("millsecs elapsed:"+(time2-time1));
      System.exit(res);

  }
}

我得到的错误是
O mapred.MapTask: numReduceTasks: 0
Exception reading DistributedCache: java.io.FileNotFoundException: \tmp\hadoop-LopezGG\mapred\local\archive\-2564469513526622450_-1173562614_1653082827\file\C\Users\LopezGG\workspace\Second_join\input1_1 (The system cannot find the file specified)
ds[Lorg.apache.hadoop.fs.Path;@366a88bb
12/04/24 23:15:01 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/04/24 23:15:01 INFO mapred.LocalJobRunner: 

但任务执行完成。有人请帮助我>我查看了其他帖子并进行了所有修改,但仍然无法正常工作

最佳答案

我必须承认,我从不使用 DistributedCache 类(而是通过 GenericOptionsParser 使用 -files 选项),但我不确定 DistributedCache 在运行作业之前会自动将本地文件复制到 HDFS 中。

虽然我在 Hadoop 文档中找不到任何证据证明这一事实,但在 Pro Hadoop 书中提到了一些与此有关的内容:

  • http://books.google.com/books?id=8DV-EzeKigQC&pg=PA133&dq=%22The+URI+must+be+on+the+JobTracker+shared+file+system%22&hl=en&sa=X&ei=jNGXT_LKOKLA6AG1-7j6Bg&ved=0CEsQ6AEwAA#v=onepage&q=%22The%20URI%20must%20be%20on%20the%20JobTracker%20shared%20file%20system%22&f=false

  • 在您的情况下,首先将文件复制到 HDFS,然后在调用 DistributedCache.addCacheFile 时,在 HDFS 中传递文件的 URI,看看是否适合您

    关于hadoop - 在 Hadoop 0.20 中使用分布式缓存进行复制连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10309320/

    相关文章:

    hadoop - YARN 上的 Spark 中的日志在哪里?

    java - 自定义模型的缓存/持久化的最佳方法

    caching - 如何在 spring boot 应用程序中实现 redis 分布式缓存,允许像 RDBMS 一样查询缓存数据

    hadoop - 作业期间更改了Hadoop分布式缓存对象

    java - 集成外部程序

    hadoop - 使用密码 ssh 的防护方法

    java - 对于 Hazelcast 来说,一张大 map 和多张小 map 哪个更好?

    jakarta-ee - 一致性: BackingMap is not resettable

    hadoop - 在 map reduce 的 reduce 端进行同机数据处理

    hadoop - 如何获取 Phoenix URL 的 HBase IP 地址