我是 Java 新手,我正在寻找 elasticsearch V5.X 和 Spark 之间连接器的一些示例,以便了解一些用例。
目前这是我的代码:
package Spark;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Level;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import scala.collection.immutable.Map;
import twitter4j.Status;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
public class EsSpark {
public EsSpark(){
SparkConf conf = new SparkConf().setAppName("MyApp1").setMaster("localhost");
conf.set("es.index.auto.create", "true");
JavaSparkContext jsc = new JavaSparkContext(conf);
Map<String, ?> numbers = (Map<String, ?>) ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> airports = (Map<String, ?>) ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");
JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
JavaEsSpark.saveToEs(javaRDD, "spark/docs");
}
}
谢谢。
最佳答案
除非您使用 Elasticsearch 的本地实例,否则需要提供一些重要设置,特别是 es.nodes
。
您可以使用
conf.set("es.nodes", "eshost:9200");
您甚至可以指定多个实例,首选主节点,但不需要所有节点。
讨论论坛的人们 elastic经常发布一些可以用作示例的代码。
确保提供多个文档作为 EsSpark
或 EsSparkStreaming
对象。不要每次发送 1 个文档,最好发送多个文档。
EsSpark
或 EsSparkStreaming
连接到您提供的节点,它们检查集群拓扑(节点数量、节点类型),并将数据直接发送到数据节点并连接到正确的分片(避免跳跃)。
可以防止将数据直接推送到数据节点(使用此 section of the documentation 中指定的设置),但您会引入瓶颈。
关于java - Apache Spark 与 elasticsearch V5.X,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44756209/