java - 两个具有多个数据源的映射器

标签 java hadoop mapreduce hbase mapper

我写了2个映射器Map1和Map2

Map1-读取HDFS中的seq文件并对其进行处理。

Map2-从HBASE读取并产生与Map1相同的键值对。

最后,我将它们合并到ReducerAll中。

问题是只有一个映射程序正在运行,并且该作业可以完成而不会发生任何类型的错误。只有最后一个映射器正在运行(即TableMapReduceUtil)。如果我交换行TableMapReduceUtilMultipleInputs,则最后一个即MultipleInputs映射器运行。

我在这里做错了什么?两种情况都不会引发任何错误。我还使用addCacheFile()读取了2个文件进行处理,但是我猜这里没关系。

Job job3 = Job.getInstance(config, "Test");
if (true) {


  job3.setJarByClass(Main.class);


  job3.setMapOutputKeyClass(ImmutableBytesWritable.class);
  job3.setMapOutputValueClass(ImmutableBytesWritable.class);
  job3.setOutputKeyClass(ImmutableBytesWritable.class);
  job3.setOutputValueClass(ImmutableBytesWritable.class);


  job3.getConfiguration().set("StartDate", c_startDate);
  job3.getConfiguration().set("EndDate", c_endDate);


  job3.addCacheFile(new URI(args[8]));
  job3.getConfiguration().set("abc", args[8].substring(args[8].lastIndexOf("/") + 1));

  job3.addCacheFile(new URI(args[9]));
  job3.getConfiguration().set("xyz", args[9].substring(args[9].lastIndexOf("/") + 1));
  job3.setReducerClass(ReducerAll.class);
  job3.setOutputFormatClass(SequenceFileOutputFormat.class);

  job3.setNumReduceTasks(10);

  Scan scan = new Scan();
  scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("hbasetable"));
  scan.setCaching(300);
  scan.setCacheBlocks(false);

  MultipleInputs.addInputPath(job3, new Path(args[6]), SequenceFileInputFormat.class, Map1.class);
  TableMapReduceUtil.initTableMapperJob(
          "hbasetable",
          scan,
          Map2.class,
          ImmutableBytesWritable.class,
          ImmutableBytesWritable.class,
          job3);


  FileOutputFormat.setOutputPath(job3, new Path(args[7]));
  job3.waitForCompletion(true);
  if (!job3.waitForCompletion(true)) {
    return (1);
  }

最佳答案

我认为该行为是由于以下两行:

MultipleInputs.addInputPath(job3, new Path(args[6]), SequenceFileInputFormat.class, Map1.class);
 TableMapReduceUtil.initTableMapperJob(
      "hbasetable",
      scan,
      Map2.class,
      ImmutableBytesWritable.class,
      ImmutableBytesWritable.class,
      job3); 
  • 只有一个作业 job3
  • 尽管您已经提到有两个映射器,但请查看映射器类型。 Map1中的映射器将与Map2不同。 Map1MapperMap2TableMapper
  • 将这两个语句放在一起并不意味着它们本质上包含在 job3 的MultipleInputs设置中。 MultipleInputs仍然只有一个Map1 设置。 Map2的其他设置仍然是单独的。
  • 现在执行。两种配置中的后者MultipleInputs或TableMapReduceUtil覆盖 job3 中的前一个,因此只有一个映射器执行

  • 附言:-请让我知道这是否不正确,我尚未证实我在机器上介绍的理解。

    关于java - 两个具有多个数据源的映射器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36698737/

    相关文章:

    Hadoop mapreduce getMapOutput 失败

    hadoop - mapreduce:可以减少阶段 "emit"吗?

    java - spring boot 2.1.7 默认登录不成功

    Java servlet doGet() 在第一次调用后不刷新

    java - mvn azure 函数 :deploy creates 'not valid storage account name'

    mongodb - 使用 mongodb 聚合框架按数组长度分组

    Windows 上的 Python Hadoop 流,脚本不是有效的 Win32 应用程序

    java - 在 JAVA 中解析具有混合已知/未知字段的 JSON

    hadoop - Apache Pig脚本输出文件

    hadoop - 在本地计算机上练习Hadoop