scala - Elasticsearch-Hadoop 库无法连接到 docker 容器

标签 scala elasticsearch apache-spark docker elasticsearch-hadoop

我有从 Cassandra 读取数据、处理/转换/过滤数据并将结果写入 Elasticsearch 的 Spark 作业。我使用 docker 进行集成测试,但在从 spark 写入 Elasticsearch 时遇到了麻烦。

依赖关系:

"joda-time"              % "joda-time"          % "2.9.4",
"javax.servlet"          %  "javax.servlet-api" % "3.1.0",
"org.elasticsearch"      %  "elasticsearch"     % "2.3.2",
"org.scalatest"          %% "scalatest"         % "2.2.1",
"com.github.nscala-time" %% "nscala-time"       % "2.10.0",
"cascading"              %   "cascading-hadoop" % "2.6.3",
"cascading"              %   "cascading-local"  % "2.6.3",
"com.datastax.spark"     %% "spark-cassandra-connector" % "1.4.2",
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5",
"org.elasticsearch"      %  "elasticsearch-hadoop"      % "2.3.2" excludeAll(ExclusionRule("org.apache.storm")),
"org.apache.spark"       %% "spark-catalyst"            % "1.4.0" % "provided"

在我的单元测试中,我可以使用 TransportClient 连接到 elasticsearch 以设置我的模板和索引

又名。这行得通

val conf = new SparkConf().setAppName("test_reindex").setMaster("local")
  .set("spark.cassandra.input.split.size_in_mb", "67108864")
  .set("spark.cassandra.connection.host", cassandraHostString)
  .set("es.nodes", elasticsearchHostString)
  .set("es.port", "9200")
  .set("http.publish_host", "")
sc = new SparkContext(conf)
esClient = TransportClient.builder().build()
esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticsearchHostString), 9300))
esClient.admin().indices().preparePutTemplate(testTemplate).setSource(Source.fromInputStream(getClass.getResourceAsStream("/mytemplate.json")).mkString).execute().actionGet()
esClient.admin().indices().prepareCreate(esTestIndex).execute().actionGet()
esClient.admin().indices().prepareAliases().addAlias(esTestIndex, "hot").execute().actionGet()

但是当我尝试运行时

EsSpark.saveToEs(
  myRDD,
  "hot/mytype",
  Map("es.mapping.id" -> "id", "es.mapping.parent" -> "parent_id")
)

我收到这个堆栈跟踪

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/08 12:30:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我可以使用“docker network inspect bridge”验证它是否正在尝试连接到正确的 IP 地址。

docker network inspect bridge
[
{
    "Name": "bridge",
    "Id": "ef184e3be3637be28f854c3278f1c8647be822a9413120a8957de6d2d5355de1",
    "Scope": "local",
    "Driver": "bridge",
    "EnableIPv6": false,
    "IPAM": {
        "Driver": "default",
        "Options": null,
        "Config": [
            {
                "Subnet": "172.17.0.0/16",
                "Gateway": "172.17.0.1"
            }
        ]
    },
    "Internal": false,
    "Containers": {
        "0c79680de8ef815bbe4bdd297a6f845cce97ef18bb2f2c12da7fe364906c3676": {
            "Name": "analytics_rabbitmq_1",
            "EndpointID": "3f03fdabd015fa1e2af802558aa59523f4a3c8c72f1231d07c47a6c8e60ae0d4",
            "MacAddress": "02:42:ac:11:00:04",
            "IPv4Address": "172.17.0.4/16",
            "IPv6Address": ""
        },
        "9b1f37c8df344c50e042c4b3c75fcb2774888f93fd7a77719fb286bb13f76f38": {
            "Name": "analytics_elasticsearch_1",
            "EndpointID": "fb083d27aaf8c0db1aac90c2a1ea2f752c46d8ac045e365f4b9b7d1651038a56",
            "MacAddress": "02:42:ac:11:00:02",
            "IPv4Address": "172.17.0.2/16",
            "IPv6Address": ""
        },
        "ed0cfad868dbac29bda66de6bee93e7c8caf04d623d9442737a00de0d43c372a": {
            "Name": "analytics_cassandra_1",
            "EndpointID": "2efa95980d681b3627a7c5e952e2f01980cf5ffd0fe4ba6185b2cab735784df6",
            "MacAddress": "02:42:ac:11:00:03",
            "IPv4Address": "172.17.0.3/16",
            "IPv6Address": ""
        }
    },
    "Options": {
        "com.docker.network.bridge.default_bridge": "true",
        "com.docker.network.bridge.enable_icc": "true",
        "com.docker.network.bridge.enable_ip_masquerade": "true",
        "com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
        "com.docker.network.bridge.name": "docker0",
        "com.docker.network.driver.mtu": "1500"
    },
    "Labels": {}
}
]

我在 macbook/osx 上本地运行所有内容。我不知道为什么我可以使用 TransportClient 并通过我的浏览器连接到 docker 容器,但函数 EsSpark.saveToES(...) 总是失败。

最佳答案

通过设置

.config("es.nodes.wan.only", "true")

可以解决这个问题

es.nodes.ingest.only

(default false) Whether to use Elasticsearch ingest nodes only. When enabled, elasticsearch-hadoop will route all of its requests (after nodes discovery, if enabled) through the ingest nodes within the cluster. The purpose of this configuration setting is to avoid incurring the cost of forwarding data meant for a pipeline from non-ingest nodes; Really only useful when writing data to an Ingest Pipeline (see es.ingest.pipeline above).

关于scala - Elasticsearch-Hadoop 库无法连接到 docker 容器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38836893/

相关文章:

scala - 加上不能在 Scala 解释器中工作

apache-spark - 为什么启动 spark-shell 失败并显示 "we couldn' t find any external IP address!"在 Windows 上?

apache-spark - Spark Streaming 中的 groupby 理想策略

python - 使用 pyspark 将数据框中的列调用到函数中

elasticsearch - 过滤器汇总中的AND/OR操作

基于 Java 的 Android 应用程序 -> 切换到 Scala

sql - Spark 计算分组依据中的单词数

scala - 为什么编译器不会因 Spark 列表达式不是 BooleanType 而引发错误?

ubuntu - 错误日志 Elasticsearch 的默认位置

elasticsearch - Elasticsearch-如何使用不同的分析器索引同一字段