我想使用RDD[(Object,Object)] messages
将saveToEs
保存到ElasticSearch中。在下面的代码中,我遍历DStream[String] transformed
,并为每个RDD[String] rdd
使用RDD[(Object,Object)]
创建prepare
。问题是,尽管我将 Artifact saveToEs
(版本2.3.2)添加到elasticsearch-hadoop
中,但pom.xml
标记为红色:
transformed.foreachRDD(rdd => {
if (!rdd.isEmpty) {
val messages = rdd.map(prepare)
messages.saveToEs(ec.getResource().toString)
}
})
private def prepare(message:String):(Object,Object) = {
val m = JSON.parseFull(message) match {
case Some(map) => map.asInstanceOf[Map[String,String]]
case None => Map.empty[String,String]
}
val kw = NullWritable.get
val vw = new MapWritable
for ((k, v) <- m) vw.put(new Text(k), new Text(v))
(kw, vw)
}
最佳答案
您需要导入适当的软件包才能使用org.elasticsearch.spark
中定义的saveToES方法
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark_2.10 -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark_2.10</artifactId>
<version>2.2.0</version>
</dependency>
资源:
https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-write-scala
关于scala - Spark Streaming + Elasticsearch:无法解析符号saveToEs,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38080206/