java - Apache Spark 与 elasticsearch V5.X

标签 java apache-spark elasticsearch

我是 Java 新手,我正在寻找 elasticsearch V5.X 和 Spark 之间连接器的一些示例,以便了解一些用例。

目前这是我的代码:

package Spark;

import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Level;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.Test;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import scala.collection.immutable.Map;
import twitter4j.Status;

import org.apache.spark.api.java.JavaSparkContext;                              
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; 

public class EsSpark {



    public EsSpark(){

        SparkConf conf = new SparkConf().setAppName("MyApp1").setMaster("localhost");
        conf.set("es.index.auto.create", "true");

        JavaSparkContext jsc = new JavaSparkContext(conf);  
        Map<String, ?> numbers = (Map<String, ?>) ImmutableMap.of("one", 1, "two", 2);                   
        Map<String, ?> airports = (Map<String, ?>) ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

        JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));


        JavaEsSpark.saveToEs(javaRDD, "spark/docs");    
        }

}

谢谢。

最佳答案

除非您使用 Elasticsearch 的本地实例,否则需要提供一些重要设置,特别是 es.nodes

您可以使用

conf.set("es.nodes", "eshost:9200");

您甚至可以指定多个实例,首选主节点,但不需要所有节点。

请引用official documentation .

讨论论坛的人们 elastic经常发布一些可以用作示例的代码。

确保提供多个文档作为 EsSparkEsSparkStreaming 对象。不要每次发送 1 个文档,最好发送多个文档。

EsSparkEsSparkStreaming 连接到您提供的节点,它们检查集群拓扑(节点数量、节点类型),并将数据直接发送到数据节点并连接到正确的分片(避免跳跃)。 可以防止将数据直接推送到数据节点(使用此 section of the documentation 中指定的设置),但您会引入瓶颈。

关于java - Apache Spark 与 elasticsearch V5.X,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44756209/

相关文章:

java - 插入并发 HashMap

java - Jitpack "ERROR: No build artifacts found"怎么解决?

java - 如何提高通过 HttpURLConnection 下载时的下载速度?

apache-spark - 如何在 Spark 2.4.4 中使用增量创建表?

php - Lucene样式查询字符串Elasticsearch PHP

Java Pokemon 程序 --> 删除 map 条目时出现并发修改异常

apache-spark - Spark 流: How to load a Pipeline on a Stream?

linux - netcat 能否以直通方式工作

database - 根据数组中的AND条件过滤ElasticSearch结果

elasticsearch - 有没有办法从脚本访问 "inner_hits"数据