java - 如何从Java将级联作业提交到远程YARN群集?

标签 java hadoop mapreduce yarn cascading

我知道我可以通过将级联作业打包到JAR中来提交级联作业,如级联用户指南中所述。如果我使用hadoop jar CLI命令手动提交,则该作业将在我的集群上运行。

但是,在原始Hadoop 1级联版本中,可以通过在Hadoop JobConf上设置某些属性来向集群提交作业。设置fs.defaultFSmapred.job.tracker会导致本地Hadoop库自动尝试将作业提交到Hadoop1 JobTracker。但是,设置这些属性在新版本中似乎不起作用。与服务器协商时,使用Cascading 2.5.3版(将CDH5列为受支持的平台)提交到CDH5 5.2.1 Hadoop集群会导致IPC异常,如下所述。

我相信,这种平台组合-级联2.5.6,Hadoop 2,CDH 5,YARN和MR1 API可以提交-是基于compatibility table的受支持组合(请参见“先前版本”标题下)。并且使用hadoop jar提交作业在同一集群上也可以正常工作。在提交主机和ResourceManager之间开放了端口8031。在服务器端的ResourceManager日志中发现带有相同消息的错误。

我正在使用cascading-hadoop2-mr1库。

Exception in thread "main" cascading.flow.FlowException: unhandled exception
    at cascading.flow.BaseFlow.complete(BaseFlow.java:894)
    at WordCount.main(WordCount.java:91)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RpcServerException): Unknown rpc kind in rpc headerRPC_WRITABLE
    at org.apache.hadoop.ipc.Client.call(Client.java:1411)
    at org.apache.hadoop.ipc.Client.call(Client.java:1364)
    at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:231)
    at org.apache.hadoop.mapred.$Proxy11.getStagingAreaDir(Unknown Source)
    at org.apache.hadoop.mapred.JobClient.getStagingAreaDir(JobClient.java:1368)
    at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:102)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:982)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:976)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:950)
    at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:105)
    at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:196)
    at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:149)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

演示代码如下,其基本与Cascading用户指南中的WordCount示例相同。

public class WordCount {

    public static void main(String[] args) {
        String inputPath = "/user/vagrant/wordcount/input";
        String outputPath = "/user/vagrant/wordcount/output";

        Scheme sourceScheme = new TextLine( new Fields( "line" ) );
        Tap source = new Hfs( sourceScheme, inputPath );

        Scheme sinkScheme = new TextDelimited( new Fields( "word", "count" ) );
        Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );

        Pipe assembly = new Pipe( "wordcount" );


        String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
        Function function = new RegexGenerator( new Fields( "word" ), regex );
        assembly = new Each( assembly, new Fields( "line" ), function );


        assembly = new GroupBy( assembly, new Fields( "word" ) );

        Aggregator count = new Count( new Fields( "count" ) );
        assembly = new Every( assembly, count );

        Properties properties = AppProps.appProps()
            .setName( "word-count-application" )
            .setJarClass( WordCount.class )
            .buildProperties();

        properties.put("fs.defaultFS", "hdfs://192.168.30.101");
        properties.put("mapred.job.tracker", "192.168.30.101:8032");

        FlowConnector flowConnector = new HadoopFlowConnector( properties );
        Flow flow = flowConnector.connect( "word-count", source, sink, assembly );

        flow.complete();
    }
}

我也尝试设置其他属性以使其正常工作:
  • mapreduce.jobtracker.address
  • mapreduce.framework.name
  • yarn.resourcemanager.address
  • yarn.resourcemanager.host
  • yarn.resourcemanager.hostname
  • yarn.resourcemanager.resourcetracker.address

  • 这些都不起作用,它们只会导致作业以本地模式运行(除非还设置了mapred.job.tracker)。

    最佳答案

    我现在已经解决了这个问题。它来自尝试使用Cloudera分发的旧Hadoop类,尤其是JobClient。如果您使用提供的hadoop-core版本的2.5.0-mr1-cdh5.2.1或具有相同版本号的hadoop-client依赖项,就会发生这种情况。尽管此版本声称是MR1版本,并且我们正在使用MR1 API进行提交,但该版本实际上仅支持向Hadoop1 JobTracker的提交,并且不支持YARN。

    为了允许提交到YARN,您必须将hadoop-client依赖项与非MR1 2.5.0-cdh5.2.1版本一起使用,该版本仍支持将MR1作业提交到YARN。

    关于java - 如何从Java将级联作业提交到远程YARN群集?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27606807/

    相关文章:

    java - 红黑树 - 如何找到节点的父节点?

    java - Dropwizard - 如何从 View 进行服务器端重定向?

    hadoop - 在Hive Shell执行查询中,从emp中选择*,将所有值都设置为null

    hadoop - 为什么hadoop不允许使用VIM编辑文件?

    hadoop - 用于实验的免费 Hadoop 集群

    hadoop - 在mapreduce中,reducer如何找到要拉出的 map 输出分区

    java - Ant 构建无效目标版本中的问题

    java - Selenium 找不到元素

    javascript - MongoDB mapReduce 多键值计数问题

    java - MapReduce 程序映射任务超时