hadoop - 使用 Hadoop 的 Yarn 任务中的 InvocationTargetException

标签 hadoop mapreduce apache-kafka hadoop-yarn apache-apex

在运行 Kafka -> Apache Apex ->Hbase 时,在 Yarn 任务中出现以下异常:

com.datatorrent.stram.StreamingAppMasterService: Application master, appId=4, clustertimestamp=1479188884109, attemptId=2
2016-11-15 11:59:51,068 INFO org.apache.hadoop.service.AbstractService: Service com.datatorrent.stram.StreamingAppMasterService failed in state INITED; cause: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
    at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:130)
    at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:156)
    at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:241)
    at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:333)
    at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:330)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
    at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:330)
    at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:444)

我的 DataTorrent 日志显示以下异常。我正在运行与 Kafka -> Apex -> Hbase 流应用程序通信的应用程序。

 Connecting to ResourceManager at hduser1/127.0.0.1:8032
 16/11/15 17:47:38 WARN client.EventsAgent: Cannot read events for application_1479208737206_0008: java.io.FileNotFoundException: File does not exist: /user/hduser1/datatorrent/apps/application_1479208737206_0008/events/index.txt
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1893)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1834)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1814)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1786)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:552)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:362)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2036)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2034)

添加代码:

public void populateDAG(DAG dag, Configuration conf){
KafkaSinglePortInputOperator in
  = dag.addOperator("kafkaIn", new KafkaSinglePortInputOperator());

in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

dag.addStream("data", in.outputPort, out.input);}

LineOutputOperator extends AbstractFileOutputOperator

private static final String NL = System.lineSeparator();
private static final Charset CS = StandardCharsets.UTF_8;

@NotNull
private String baseName;

@Override
public byte[] getBytesForTuple(byte[] t) {
  String result = new String(t, CS) + NL;
  return result.getBytes(CS);
}

@Override
protected String getFileName(byte[] tuple) {
 return baseName;
}

public String getBaseName() { return baseName; }
public void setBaseName(String v) { baseName = v; }

如何解决这个问题?

谢谢。

最佳答案

您能否分享一些关于您的环境的详细信息,例如 hadoop 和 apex 的版本?还有,这个异常出现在哪个日志?

就像一个简单的健全性检查一样,您能否运行在以下位置描述的简单 maven 原型(prototype)生成的应用程序:http://docs.datatorrent.com/beginner/

如果可行,请尝试在以下位置运行 fileIO 和 kafka 应用程序: https://github.com/DataTorrent/examples/tree/master/tutorials

如果这些工作正常,我们可以查看您的代码的详细信息。

关于hadoop - 使用 Hadoop 的 Yarn 任务中的 InvocationTargetException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40603536/

相关文章:

hadoop - EBCDIC 到包含 COMP 类型的 ASCII

hadoop - Presto 和 Hive 分区发现

hadoop - 寻找关键的最大值(value)

javascript - 将数组映射到由数组值作为键的对象

python - 无法使用 python-kafka 消费消息

.net - SSL confluent-kafka-dotnet librdkafka SSL

java - Hbase读取性能异常变化

hadoop - 在 Hadoop 中的多个文件中写入输出

hadoop - Mapreduce程序可计算文件中单词的总数

apache-kafka - Kafka Consumer Group Id 和消费者再平衡问题