hadoop - 如何在hadoop中使用JobControl

标签 hadoop

我想将两个文件合并为一个。
我制作了两个映射器来阅读,并制作了一个 reducer 。

        JobConf classifiedConf = new JobConf(new Configuration());
            classifiedConf.setJarByClass(myjob.class);
    classifiedConf.setJobName("classifiedjob");
    FileInputFormat.setInputPaths(classifiedConf,classifiedInputPath );
    classifiedConf.setMapperClass(ClassifiedMapper.class);
    classifiedConf.setMapOutputKeyClass(TextPair.class);
    classifiedConf.setMapOutputValueClass(Text.class);
    Job classifiedJob = new Job(classifiedConf);
    //first mapper config

    JobConf featureConf = new JobConf(new Configuration());
    featureConf.setJobName("featureJob");
            featureConf.setJarByClass(myjob.class);
    FileInputFormat.setInputPaths(featureConf, featuresInputPath);
    featureConf.setMapperClass(FeatureMapper.class);
    featureConf.setMapOutputKeyClass(TextPair.class);
    featureConf.setMapOutputValueClass(Text.class);
    Job featureJob = new Job(featureConf);
            //second mapper config

    JobConf joinConf = new JobConf(new Configuration());
    joinConf.setJobName("joinJob");
            joinConf.setJarByClass(myjob.class);
    joinConf.setReducerClass(JoinReducer.class);
    joinConf.setOutputKeyClass(Text.class);
    joinConf.setOutputValueClass(Text.class);
    Job joinJob = new Job(joinConf);
             //reducer config
             //JobControl config
            joinJob.addDependingJob(featureJob);
    joinJob.addDependingJob(classifiedJob);
    secondJob.addDependingJob(joinJob);
    JobControl jobControl = new JobControl("jobControl");
    jobControl.addJob(classifiedJob);
    jobControl.addJob(featureJob);
    jobControl.addJob(secondJob);

    Thread thread = new Thread(jobControl);
    thread.start();
    while(jobControl.allFinished()){
        jobControl.stop();
    }

但是,我收到此消息:
WARN mapred.JobClient:
Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).

任何人都可以帮助...

最佳答案

您正在使用哪个版本的Hadoop?

您收到的警告将停止程序?

您不需要使用setJarByClass()。您可以看到我的代码段,无需使用setJarByClass()方法即可运行它。

JobConf job = new JobConf(PageRankJob.class);
job.setJobName("PageRankJob");

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(PageRankMapper.class);
job.setReducerClass(PageRankReducer.class);

job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

JobClient.runJob(job);

关于hadoop - 如何在hadoop中使用JobControl,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11840589/

相关文章:

hadoop - 如何使用 Hadoop MapReduce 或 Spark 进行数据预处理?

hadoop - Sqoop导入显示错误-Jobtracker尚未运行

java - 运行 mahout 时线程 "main"java.lang.NullPointerException 中的异常

hadoop - 等待AM容器被分配、启动并注册到RM

Hadoop HDFS : How to set hostname:9000 instead of localhost:9000

java - 创建外部表hive,location里面包含多个文件

apache - 如何在 Mahout K 均值聚类中维护数据条目 ID

hadoop - 提供程序 com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem 无法实例化

分布在不同子网的Hadoop集群(Docker + Flannel)

hadoop - 使用 Sqoop 导入时 Parquet 文件中的脏值