java - 在独立模式下使用 spark 在 elasticsearch 中建立索引

标签 java elasticsearch apache-spark

我想在 spark 的 elasticsearch 中建立索引。它抛出以下异常...

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.StringIndexOutOfBoundsException: String index out of range: -1 at java.lang.String.substring(String.java:1967) at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:110) at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:58) at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:372) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912) at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:67) at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:52) at org.elasticsearch.spark.rdd.api.java.JavaEsSpark$.saveToEs(JavaEsSpark.scala:54) at org.elasticsearch.spark.rdd.api.java.JavaEsSpark.saveToEs(JavaEsSpark.scala) at com.tgt.search.metrics.es.bulk.Sparkimporter.main(Sparkimporter.java:88) Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -1 at java.lang.String.substring(String.java:1967) at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:110) at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:58) at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:372) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

这是我的代码...

SparkConf conf = new SparkConf().setMaster("local")
            .setAppName("Indexer").set("spark.driver.maxResultSize", "2g");
    conf.set("es.index.auto.create", "true");
    conf.set("es.nodes", "localhost");
    conf.set("es.port", "9200");
    conf.set("es.write.operation", "index");
    JavaSparkContext sc = new JavaSparkContext(conf);
    Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
    Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

    JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(doc1, doc2));
    JavaEsSpark.saveToEs(javaRDD, "spark/docs");      

我尝试在本地写入文件,但工作正常....这可能是配置问题。

这些是我的 pom.xml 中的依赖项

<dependencies>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>2.1.0</version>
    </dependency>
    <!-- <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> 
        <version>2.6.4</version> </dependency> -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.5.1</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>

</dependencies>                               

最佳答案

堆栈跟踪中的相关行是:

java.lang.StringIndexOutOfBoundsException: String index out of range: -1 at java.lang.String.substring(String.java:1967) at ...

此错误不是来自您的代码。这是由于elasticsearch版本与您使用的elasticsearch-hadoop适配器不兼容造成的。版本 2.0.x 和 2.1.x 的 elasticsearch-hadoop 适配器仅适用于 elasticsearch 1.x。我在使用 elasticsearch 2.1.1 时遇到了同样的错误,不得不将我的 elasticsearch 版本降级到 1.4.4。然后错误消失了。

查看 costin 的回答 here

关于java - 在独立模式下使用 spark 在 elasticsearch 中建立索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34238125/

相关文章:

java - 在 OSGi 中验证 Pojo

java - 如何使用 <a> 元素在 cookie 中保存信息

elasticsearch - 如何在Elasticsearch中使用数组字段精确查找短语?

arrays - ScalaTestFailureLocation 预期的 StructField(value1,ArrayType(StringType,true),false) 实际的 StructField(val2,ArrayType(StringType,true),true)

scala - Spark 3.0 中使用聚合器的通用 UDAF

java - hibernate.异常.SQLGrammarException : could not extract ResultSet: ORA-00942: table or view does not exist

java - 如何检查方法中的多个条件并根据其返回值执行某些操作

logging - 分布式环境中日志事件的相关性

elasticsearch - 对Cassandra数据进行ElasticSearch与将Cassandra数据移至ElasticSearch进行索引

在spark中执行sbt包时出现Java内存问题