java - 运行读取Elasticsearch的map-reduce作业时出错

标签 java hadoop elasticsearch mapreduce

尝试执行从Elasticsearch读取数据的map-reduce任务时出现以下错误:-

java.lang.Exception: java.lang.RuntimeException: problem advancing post rec#0
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.RuntimeException: problem advancing post rec#0
    at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1364)
    at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:220)
    at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:216)
    at org.apache.hadoop.mapred.lib.IdentityReducer.reduce(IdentityReducer.java:45)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)
Caused by: java.io.IOException: can't find class: org.elasticsearch.hadoop.mr.LinkedMapWritable because org.elasticsearch.hadoop.mr.LinkedMapWritable
    at org.apache.hadoop.io.AbstractMapWritable.readFields(AbstractMapWritable.java:212)
    at org.apache.hadoop.io.MapWritable.readFields(MapWritable.java:169)
    at org.elasticsearch.hadoop.mr.LinkedMapWritable.readFields(LinkedMapWritable.java:148)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
    at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1421)
    at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1361)
    ... 12 more
14/09/08 16:18:43 INFO mapreduce.Job: Job job_local1675221004_0001 failed with state FAILED due to: NA

我的主要Runner类如下:-
public class Es2 {

        static private final Path TMP_DIR = new Path(Es2.class.getSimpleName()
            + "_TMP_1");

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws IOException{

    JobConf conf = new JobConf();
    conf.set("es.resource", "conceptnet_data/concept");       
    conf.set("es.query", "?q=me*");                 
    conf.setInputFormat(EsInputFormat.class);       
    conf.setMapOutputKeyClass(Text.class);          
    conf.setMapOutputValueClass(LinkedMapWritable.class);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(LinkedMapWritable.class);
    conf.setOutputFormat(TextOutputFormat.class);
    conf.setMapperClass(mapper1.class);
    final Path outDir = new Path(TMP_DIR, "out");
    FileOutputFormat.setOutputPath(conf, outDir);
    JobClient.runJob(conf);
    }
}

映射器类如下:
public class mapper1 extends MapReduceBase implements 
        Mapper<Object,Object,Text,MapWritable>{

 @Override
 public void map(Object key, Object value, OutputCollector<Text,MapWritable> output,
                    Reporter reporter) throws IOException {
   Text docId = (Text) key;
   MapWritable doc = (LinkedMapWritable) value;      
   output.collect(docId,doc);
 }

}

请在这个问题上指导我。

最佳答案

我遇到了同样的问题,并通过在hadoop的类路径中添加elasticsearch-hadoop jars解决了这个问题。

尝试这样的事情:

export HADOOP_CLASSPATH=/home/tariq/java/library/elasticsearch-hadoop-mr-2.0.2.jar:/home/tariq/java/library/elasticsearch-hadoop-2.0.2.jar

关于java - 运行读取Elasticsearch的map-reduce作业时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25722958/

相关文章:

java - 使用 imageJ 查找边缘数组

java - 我如何比较 Java 中的字符串和字符数组?

java - AWS DynamoDBMapper 保存方法不断抛出 `DynamoDBMappingException: not supported; requires @DynamoDBTyped or @DynamoDBTypeConverted`

hadoop - HBase 表大小在一段时间后减少

elasticsearch - Elasticsearch过滤问题

apache-spark - 从spark写入elasticsearch非常慢

Java 掷骰子代码以供审查

apache-spark - 从 spark 读取使用 CTE(With 子句)创建的 Hive View

hadoop - 使用 Hadoop 集群的自定义域?

python - Python中Elasticsearch相对时间范围查询