apache-spark - 连接Spark和elasticsearch

标签 apache-spark elasticsearch

我正在尝试运行一个简单的Spark代码,该代码将RDD的内容复制到 flex 搜索文档中。 Spark和 flex 搜索均安装在我的本地计算机上。

    import org.elasticsearch.spark.sql._
    import org.apache.spark.sql.SparkSession

    object ES {

   case class Person(ID: Int, name: String, age: Int, numFriends: 
  Int);

     def mapper(line: String): Person = {
    val fields = line.split(',')
    val person: Person = Person(fields(0).toInt, fields(1), 
    fields(2).toInt, fields(3).toInt)
    return person}

    def main(args: Array[String]): Unit = {

    val spark: SparkSession =
    SparkSession
    .builder().master("local[*]")
    .appName("SparkEs")
       .config("es.index.auto.create", "true")
       .config("es.nodes","localhost:9200")
     .getOrCreate()

import spark.implicits._

val lines = spark.sparkContext.textFile("/home/herch/fakefriends.csv")
val people = lines.map(mapper).toDF()

people.saveToEs("spark/people")

}

}

我收到此错误。重试多次后
 INFO HttpMethodDirector: I/O exception (java.net.ConnectException) 
 caught when processing request:Connection timed out (Connection timed 
 out)

 INFO HttpMethodDirector: Retrying request

 INFO DAGScheduler: ResultStage 0 (runJob at EsSparkSQL.scala:97) 
 failed in 525.902 s due to Job aborted due to stage failure: Task 1 
 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in 
 stage 0.0 (TID 1, localhost, executor driver): 
 org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: 
 Connection error (check network and/or proxy settings)- all nodes 
 failed; tried [[192.168.0.22:9200]] 

这似乎是一个连接问题,但我无法确定其原因。 flex 搜索在本地计算机上的localhost:9200上运行,我能够通过终端查询它。

最佳答案

elasticsearch / spark connector documentation page所示,您需要在配置内部分离host和port参数:

val options13 = Map("path" -> "spark/index",
                "pushdown" -> "true",
                "es.nodes" -> "someNode", "es.port" -> "9200")

查看es.nodes如何仅包含主机名,而es.port如何包含HTTP端口。

关于apache-spark - 连接Spark和elasticsearch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47651162/

相关文章:

scala - Spark Streaming dropDuplicates

apache-spark - 为什么即使使用 DataFrame API 按分区键查询表,Spark Cassandra 连接器也允许过滤?

hadoop - Spark 支持子查询吗?

lucene - ElasticSearch 构面计数与总项目不匹配

elasticsearch - ElasticSearch 中理想的批量大小公式是什么?

elasticsearch - Elasticsearch 总和聚合中的问题

scala - 向 Spark-jobserver 提交 fat jar 时超时(akka.pattern.AskTimeoutException)

java - Spark中JavaRDD分区之间如何共享数据?

Elasticsearch 集群运行状况 : yellow (131 of 262) unassigned shards

elasticsearch - Elasticsearch 索引ShardGatewayRecoveryException