我正在尝试运行一个简单的Spark代码,该代码将RDD的内容复制到 flex 搜索文档中。 Spark和 flex 搜索均安装在我的本地计算机上。
import org.elasticsearch.spark.sql._
import org.apache.spark.sql.SparkSession
object ES {
case class Person(ID: Int, name: String, age: Int, numFriends:
Int);
def mapper(line: String): Person = {
val fields = line.split(',')
val person: Person = Person(fields(0).toInt, fields(1),
fields(2).toInt, fields(3).toInt)
return person}
def main(args: Array[String]): Unit = {
val spark: SparkSession =
SparkSession
.builder().master("local[*]")
.appName("SparkEs")
.config("es.index.auto.create", "true")
.config("es.nodes","localhost:9200")
.getOrCreate()
import spark.implicits._
val lines = spark.sparkContext.textFile("/home/herch/fakefriends.csv")
val people = lines.map(mapper).toDF()
people.saveToEs("spark/people")
}
}
我收到此错误。重试多次后
INFO HttpMethodDirector: I/O exception (java.net.ConnectException)
caught when processing request:Connection timed out (Connection timed
out)
INFO HttpMethodDirector: Retrying request
INFO DAGScheduler: ResultStage 0 (runJob at EsSparkSQL.scala:97)
failed in 525.902 s due to Job aborted due to stage failure: Task 1
in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in
stage 0.0 (TID 1, localhost, executor driver):
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException:
Connection error (check network and/or proxy settings)- all nodes
failed; tried [[192.168.0.22:9200]]
这似乎是一个连接问题,但我无法确定其原因。 flex 搜索在本地计算机上的localhost:9200上运行,我能够通过终端查询它。
最佳答案
如elasticsearch / spark connector documentation page所示,您需要在配置内部分离host和port参数:
val options13 = Map("path" -> "spark/index",
"pushdown" -> "true",
"es.nodes" -> "someNode", "es.port" -> "9200")
查看
es.nodes
如何仅包含主机名,而es.port
如何包含HTTP端口。
关于apache-spark - 连接Spark和elasticsearch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47651162/