apache-spark - 如何用Spark写入远程Elasticsearch节点?

标签 apache-spark hadoop elasticsearch

我有一个代码:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._
val sc = new SparkContext(conf)
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "1.2.3.4")
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")

但是当我运行它时,它会尝试转到Localhost:
sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")
19/06/11 11:56:16 ERROR rest.NetworkClient: Node [127.0.0.1:9200] failed (Connection refused (Connection refused)); no other nodes left - aborting...
19/06/11 11:56:16 ERROR rest.NetworkClient: Node [127.0.0.1:9200] failed (Connection refused (Connection refused)); no other nodes left - aborting...
19/06/11 11:56:16 ERROR executor.Executor: Exception in task 2.0 in stage 2.0 (TID 18)
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'

如何设置写入远程ES服务器?

最佳答案

请见configuration
es.nodes.discovery(默认为true)
是发现es集群中的节点,还是仅使用es.nodes中给出的节点进行元数据查询。请注意,此设置仅在启动期间适用;之后,在读写时,除非启用了es.nodes.client.only,否则es使用目标索引分片(及其托管节点)。

es.nodes.discovery设为假
例如:

EsSpark.saveToEs(userTweetRDD, "twitter/test", Map("es.nodes" -> "xx.xx.xx.xxx", "es.cluster.name" -> xxxx-xxxxx"))


"es.nodes.discovery" -> "false"

在你的情况下

你的例子:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._


val conf:SparkConf = new SparkConf().setAppName("MYESAPP")
.setMaster("local")// for "local" for local testing if you are using yarn then "yarn"

conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "1.2.3.4")
conf.set("es.nodes.discovery", "false")



val sc = new SparkContext(conf)

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")

关于apache-spark - 如何用Spark写入远程Elasticsearch节点?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56547653/

相关文章:

apache-spark - 将类型化 JavaRDD 转换为行 JavaRDD

python - 如何使用 avro 文件作为 MRJob 作业的输入?

hadoop - Impala 不在 Hbase 表上工作

elasticsearch - 在 ElasticSearch 中按距离对数字字段进行排序

MUST 和 SHOULD bool 查询之间的 Elasticsearch 区别

performance - Spark 机器学习、数据准备性能问题、MLeap

hadoop - Spark 流式传输整个文本文件

java.lang.ClassNotFoundException : Class org. apache.hadoop.hdfs.DistributedFileSystem

elasticsearch - Elasticsearch集群实例显示为单个主节点

scala - 如何使用 SBT 使用 Scala 2.11.1 编译 Apache Spark?