scala - Spark Streaming + Elasticsearch:无法解析符号saveToEs

标签 scala elasticsearch apache-spark spark-streaming

我想使用RDD[(Object,Object)] messagessaveToEs保存到ElasticSearch中。在下面的代码中,我遍历DStream[String] transformed,并为每个RDD[String] rdd使用RDD[(Object,Object)]创建prepare。问题是,尽管我将 Artifact saveToEs(版本2.3.2)添加到elasticsearch-hadoop中,但pom.xml标记为红色:

transformed.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    val messages = rdd.map(prepare)
    messages.saveToEs(ec.getResource().toString)
  }
})

private def prepare(message:String):(Object,Object) = {

   val m = JSON.parseFull(message) match {
      case Some(map) => map.asInstanceOf[Map[String,String]]
      case None => Map.empty[String,String]
   }

   val kw = NullWritable.get

   val vw = new MapWritable
   for ((k, v) <- m) vw.put(new Text(k), new Text(v))

   (kw, vw)       
}

最佳答案

您需要导入适当的软件包才能使用org.elasticsearch.spark中定义的saveToES方法

<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark_2.10 -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark_2.10</artifactId>
    <version>2.2.0</version>
</dependency>

资源:

https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-write-scala

关于scala - Spark Streaming + Elasticsearch:无法解析符号saveToEs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38080206/

相关文章:

count - Elasticsearch 中的术语聚合中的准确文档计数

elasticsearch - ElastAlert 不工作

elasticsearch - elasticsearch:获取所有具有相同最高祖先的文档

scala - 如何从AWS SQS读取流数据集?

hadoop - 将Hadoop作业的结果添加到Hive Table

unit-testing - Play 2.2 - specs2 - 如何在 play 2.2 中测试 future ?

xml - Spark 中多列的横向 View /分解,获取重复项

java - Spark Java中多列的聚合

Scala Liftweb - 模式类型与预期类型不兼容

scala - 更新作为可变集合的可变 HashMap 值