elasticsearch - 将数据从 spark 保存到 elasticsearch 时出错 - saveToEs

标签 elasticsearch apache-spark

我正在尝试将 rdd 的输出保存到 elasticsearch 中。但是当我尝试发送它时,即使包含了几个 elasticsearch-spark 库,我也会遇到错误。我是 Elasticsearch 的新手,非常感谢任何帮助。谢谢。

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

object ElasticSpark {

def main(args: Array[String]) {

val logfile = "/Users/folder/Desktop/logfile.rtf";
val conf = new SparkConf().setMaster("local[1]").setAppName("RddTest");   // set master can be given any cpu cores as local[*], spark clustr, mesos,
conf.set("es.index.auto.create", "true")
val sc = new SparkContext(conf);

val logdata = sc.textFile(logfile); // number of partitions
val NumA = logdata.filter(line=>line.contains("a")).count();
val wordcount = logdata.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((a, b)=> a+ b);

println(wordcount.collect()); // doubt
wordcount.saveAsTextFile("/Users/folder/Desktop/sample") // success
wordcount.saveToEs("spark/docs")

}
}

错误

Error:(21, 15) value saveToEs is not a member of org.apache.spark.rdd.RDD[(String, Int)]
wordcount.saveToEs("spark/docs")
          ^
Error:(6, 12) object elasticsearch is not a member of package org
import org.elasticsearch.spark._
       ^

最佳答案

ES 支持不是 Spark 分发的一部分,它是 elasticsearch-hadoop 的一部分,因此您需要提供此依赖项。如果您使用 Maven,请添加到您的 pom.xml:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop</artifactId>
  <version>2.2.0</version>
</dependency>

对于sbt,添加到build.sbt:

libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "2.2.0" % "compile"
resolvers ++= Seq("clojars" at "https://clojars.org/repo",
                  "conjars" at "http://conjars.org/repo",
                  "plugins" at "http://repo.spring.io/plugins-release",
                  "sonatype" at "http://oss.sonatype.org/content/groups/public/")

关于elasticsearch - 将数据从 spark 保存到 elasticsearch 时出错 - saveToEs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36816313/

相关文章:

ruby - 尝试安装插件时出现连接拒绝错误

ruby-on-rails - 多个elasticsearch过滤器

scala - 如何使用 Long 数据类型在 Apache Spark GraphX 中创建 VertexId?

apache-spark - 基于 Salt 安装 Spark 集群的快速指南

apache-spark - Zeppeline - 如何设置 Zeppeline 以连接到远程 sparkmaster?

Elasticsearch 索引在磁盘上占用的空间比显示的要大

elasticsearch - 使用 elasticsearch 计算不同的值

elasticsearch - 如何从ElasticSearch仅获取多个类别的第一个文档

apache-spark - 如果在写入增量表之后立即执行 z 排序,那么在写入增量表之前重新分区是否毫无意义?

java.lang.IllegalArgumentException : Invalid lambda deserialization 异常