java - 在 Mapper 中检索当前行的文件名

标签 java hadoop mapreduce bigdata

我使用的是 Hadoop 2.6.4 版。我正在编写一个 MapReduce 作业,它将采用 3 个参数,即 -Keyword,输入文件和输出文件的路径。 我理想的输出应该是所有包含关键字的文件的名称。 简单的逻辑是遍历文本中的每一行并将其与我们的关键字匹配。如果它返回 true 打印文件名。

经过广泛的谷歌搜索后,我找到了 3 个获取文件名的选项

  1. Context.getConfiguration().get("map.input.file")
  2. Context.getConfiguration().get("mapreduce.map.input.file")

这两种方法都返回了一个值为“null”的字符串,即它们在我的终端屏幕上打印了“null”。

  1. 最后我从 site.google.com 尝试了这个

    public Path filesplit;
    filesplit=((FileSplit)context.getInputSplit()).getPath();
    System.out.println(filesplit.getName())
    

上述方法产生了错误。终端输出看起来像这样:-

java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.FileSplit cannot be cast to org.apache.hadoop.mapred.FileSplit
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.FileSplit cannot be cast to org.apache.hadoop.mapred.FileSplit
    at org.myorg.DistributedGrep$GrepMapper.map(DistributedGrep.java:23)
    at org.myorg.DistributedGrep$GrepMapper.map(DistributedGrep.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
  1. 有人可以建议解决这些错误的方法吗?可能出了什么问题,我有什么错误吗?

  2. 或者您有任何其他想法来获取映射器正在执行的当前行的文件名。

如果你想全面了解这里的代码,它是

package org.myorg;

import java.io.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DistributedGrep {

    public static class GrepMapper extends
            Mapper<Object, Text, NullWritable, Text> {
        public  Path filesplit;

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            filesplit = ((FileSplit)context.getInputSplit()).getPath();

            String txt = value.toString();
            String mapRegex = context.getConfiguration().get("mapregex");
//          System.out.println(context.getConfiguration().get("mapreduce.map.input.file");
            System.out.println(filesplit.getName());
            if (txt.matches(mapRegex)) {
                System.out.println("Matched a line");
                context.write(NullWritable.get(), value);
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        conf.set("mapregex", args[0]);

        Job job = new Job(conf, "Distributed Grep");
        job.setJarByClass(DistributedGrep.class);
        job.setMapperClass(GrepMapper.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0); // Set number of reducers to zero
        FileInputFormat.addInputPath(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

最佳答案

选项 1 和 2

Context.getConfiguration().get("map.input.file") Context.getConfiguration().get("mapreduce.map.input.file")

我相信这两个都返回 null 因为它们应该与旧的 mapred API 及其 JobConf 配置对象一起使用。您的 #3 选项是为 mapreduce API 执行此操作的方式。

使用 mapred API,您可以执行以下操作:

public void configure(JobConf job) {
  inputFile = job.get(JobContext.MAP_INPUT_FILE);
}

此处显示:https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/mapred/Mapper.html

JobContext.MAP_INPUT_FILE 常量值以前是 map.input.file 并更改为 mapreduce.map.input.file一些点。

https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/DeprecatedProperties.html

选项 3

您正在混合使用 MapReduce API。有两个 mapredmapreduce API。

您可以在收到的错误中看到这一点:

Caused by: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.FileSplit 无法转换为 org.apache.hadoop.mapred.FileSplit

您已导入:

导入 org.apache.hadoop.mapred.FileSplit

它来自 mapred API,但您使用的是 mapreduce API。将导入更改为:

导入 org.apache.hadoop.mapreduce.lib.input.FileSplit;

您的代码中的一个线索(除了导入之外)已经发生了这种情况,即您需要添加强制转换以使代码编译:

filesplit = ((FileSplit)context.getInputSplit()).getPath();

这应该是:

filesplit = context.getInputSplit().getPath();

关于java - 在 Mapper 中检索当前行的文件名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37805250/

相关文章:

regex - 在mongo中执行优先级查询

java - 如何从列表中删除所有重复项

java - Java中有随机图生成器库吗?

java - 删除字符串中的 ASCII 字符

hadoop - 在EC2上安装hadoop

java - MapReduce:Mapper和Reducer可以共享变量吗?

java - 如何在 MacOS 10.5.6 上运行 Eclipse 3.4.1?

java - hadoop 多节点集群 - 从节点无法执行 mapreduce 任务

oracle - 从Oracle将数据导入Hive时Sqoop作业卡住了

javascript - 两个看似相同的 MapReduce 函数的令人费解的行为