java - Accumulo MapReduce 作业因 java.io.EOFException 而失败,使用 AccumuloRowInputFormat

标签 java hadoop hadoop2 accumulo

我所有的映射器都失败了,除了下面的异常(exception)。为了简洁起见,我只展示了最后一次失败。

为什么会发生这种情况,我该如何解决?

16/09/21 17:01:57 INFO mapred.JobClient: Task Id : attempt_201609151451_0044_m_000002_2, Status : FAILED
java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readUTF(DataInputStream.java:609)
    at java.io.DataInputStream.readUTF(DataInputStream.java:564)
    at org.apache.accumulo.core.client.mapreduce.RangeInputSplit.readFields(RangeInputSplit.java:154)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
    at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:356)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:640)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
    at org.ap
16/09/21 17:02:00 INFO mapred.JobClient: Job complete: job_201609151451_0044
16/09/21 17:02:00 INFO mapred.JobClient: Counters: 8
16/09/21 17:02:00 INFO mapred.JobClient:   Job Counters
16/09/21 17:02:00 INFO mapred.JobClient:     Failed map tasks=1
16/09/21 17:02:00 INFO mapred.JobClient:     Launched map tasks=48
16/09/21 17:02:00 INFO mapred.JobClient:     Data-local map tasks=13
16/09/21 17:02:00 INFO mapred.JobClient:     Rack-local map tasks=35
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all maps in occupied slots (ms)=343982
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all reduces in occupied slots (ms)=0
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0

我使用 Accumulo 表作为我的输入数据。我的设置如下:

@Override
public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    String idMapFileContent = readResourceFile(TYPE_ID_MAP_FILENAME);
    conf.set(TYPE_ID_MAP_KEY, idMapFileContent);

    Job job = Job.getInstance(conf, this.getClass().getSimpleName());
    job.setJarByClass(this.getClass());
    job.setMapperClass(DanglingLinksFinderMapper.class);
    job.setReducerClass(DanglingLinksFinderReducer.class);
    this.setupRowInputFormat(job);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    Path out = new Path(args[0]);
    LOGGER.info("Writing to output directory: " + out.toUri());
    FileOutputFormat.setOutputPath(job, out);

    int exitCode = job.waitForCompletion(true) ? 0 : 1;
}

private Job setupRowInputFormat(Job job)
        throws IOException, AccumuloSecurityException
{
    job.setInputFormatClass(AccumuloRowInputFormat.class);
    Configuration conf = job.getConfiguration();

    AccumuloConnectInfo connectInfo = new AccumuloConnectInfo(conf);
    LOGGER.info(connectInfo.toString());

    AccumuloRowInputFormat.setZooKeeperInstance(job, connectInfo.getInstanceNames(), connectInfo.getZookeeperInstanceNames());
    AccumuloRowInputFormat.setConnectorInfo(job, connectInfo.getUserName(), connectInfo.getPassword());
    AccumuloRowInputFormat.setScanAuthorizations(job, new Authorizations());
    AccumuloRowInputFormat.setInputTableName(job, TABLE_NAME);
    return job;
}

我正在使用 Hadoop 2.6.0、Accumulo 1.5.0 和 Java 1.7。

前几天我让这个工作正常,但(据我所知)没有改变任何东西。所以我想这可能与我运行它的服务器上的配置或数据状态有关?该作业在我本地计算机上的 Docker 容器中运行的测试表上运行良好,但在我的远程测试服务器上失败。

我可以登录到 accumulo shell 并扫描我正在使用的表。那里一切看起来都很好。我还尝试在测试服务器上运行压缩,它工作正常但没有解决问题。

最佳答案

我猜测您用于启动 MapReduce 作业的 Accumulo jar 与您包含的作业本身通过 DistributedCache 或 libjars CLI 选项使用的 Accumulo jar 版本不匹配。

因为您没有指定任何范围,AccumuloInputFormat 将自动获取表格的所有 Tablet 边界,并创建与表格中的 Tablets 数量相同的 RangeInputSplit 对象。这种拆分创建是在本地 JVM(提交作业时创建的 JVM)中完成的。这些 RangeInputSplit 对象被序列化并传递到 YARN 中。

您提供的错误是当 Mapper 获取这些序列化的 RangeInputSplit 对象之一并尝试反序列化它时。不知何故,这是失败的,因为没有足够的序列化数据来反序列化 Mapper 中运行的 Accumulo 版本期望读取的内容。

这可能只是您的 Accumulo 版本中的一个序列化错误(请分享),但我不记得听说过这样的错误。我猜本地类路径和 Mapper 类路径上的 Accumulo 版本有所不同。

关于java - Accumulo MapReduce 作业因 java.io.EOFException 而失败,使用 AccumuloRowInputFormat,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39622702/

相关文章:

java - 在 JTable 中显示来自数据库的数据

java - 使用 Apache Beam 作为依赖项

hadoop - 从 SequenceFileAsBinaryInputFormat 读取 key

hadoop - 在sqoop中,与--split-limit参数一起使用时 “size”是什么意思

java - 如何在网页的某个框架(以div分隔)上向下滚动?

java - Netbeans + Derby + hibernate

apache-spark - 如何在hadoop服务器中查找已安装的库?

java - hadoop映射器读取多行

hadoop - 想要保持 hadoop 奴隶的用户名@主机名不同

eclipse - 如何使用 Java -jar 命令运行 map reduce 作业