scala - 如何在Spark和Elasticsearch中迭代hadoop MapWritable

标签 scala elasticsearch apache-spark

我对Spark和Scala都不熟悉。我已经在互联网上阅读了一些文章。我可以使用Spark成功地从Elasticsearch获取文档,但是我对如何从文档中提取字段感到困惑。

我做了什么

我有33,617个文档:

import ...

val conf = new JobConf()

conf.set("es.resource", "index-name/type-name")
conf.set("es.nodes", "hostname1:9200,hostname2:9200")
conf.set("es.query", "{...}")

val esRDD = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])


scala> esRDD.count() // That's GOOD!
res11: Long = 33617

scala> esRDD.take(5).foreach(row => println(row._2))
{@version=1, field1=a, ...}
{@version=1, field1=a, ...} 
{@version=1, field1=b, ...}
{@version=1, field1=b, ...}
{@version=1, field1=b, ...}

问题1:如何打印特定字段。

我不知道如何在Scala中使用org.apache.hadoop.io.MapWritable
// Error!!
scala> esRDD.take(5).foreach(row => println(row._2("field1")))
error: org.apache.hadoop.io.MapWritable does not take parameters
              esRDD.take(5).foreach(row => println(row._2("field1")))

// Oops. null is printed
scala> esRDD.take(5).foreach(row => println(row._2.get("field1")))
null
null
null
null
null

问题2:如何按计数分组

我的最终目标是按field1进行汇总并按如下所示打印其计数:
scala> esRDD.groupBy(???).mapValues(_.size)
Map(a => 2, b => 3) // How to get this output??

但是,我无法弄清楚。

@Mateusz的答案测试
$ bin/spark-shell --master local --jars jars/elasticsearch-spark_2.11-2.2.0.jar

scala> import org.elasticsearch.spark._

scala> val rdd: RDD[(String, Map[String, Any])] = sc.esRDD("index-name/type-name")
<console>:45: error: not found: type RDD
          val rdd: RDD[(String, Map[String, Any])] = sc.esRDD("index-name/type-name")
                   ^

scala> sc.esRDD("index-name/type-name")
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
        at org.elasticsearch.spark.rdd.EsSpark$.esRDD(EsSpark.scala:26)
        at org.elasticsearch.spark.package$SparkContextFunctions.esRDD(package.scala:20)

最佳答案

Elasticsearch-hadoop具有对Spark的本地支持,我建议使用它-API更加简单:

import org.elasticsearch.spark._        

val rdd: RDD[(String, Map[String, Any])] = sc.esRDD("index-name/type-name")

这是一个简单的元组rdd,其中键是文档ID,而Map表示您的ES文档。

您可以将其映射到其他元组中,如下所示:
val mapped = rdd.map{ case(id, doc) => (doc.get("field1").get, 1) }

我输入1,因为看来您在其他任何地方都不需要doc。然后执行groupByKey和 map :
mapped.groupByKey().map{ case(key,val) => (key, val.size) }

另外,如果仅使用Spark连接器,则不需要整个es-hadoop依赖关系,这相当大,您可以使用elasticsearch-spark

有关更多信息,您可以检查documentation

关于scala - 如何在Spark和Elasticsearch中迭代hadoop MapWritable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35931429/

相关文章:

scala - 如何使用 shapeless 在 Scala 列表中拆分

scala - 错误 : class Animal needs to be abstract, 因为:它有 5 个未实现的成员

apache-spark - spark 2.3.0, parquet 1.8.2 - spark write 生成的文件中不存在二进制字段的统计信息?

scala - 从提供的等动态更改 sbt 构建文件中的库依赖项

scala - 通过 Spark 读取文件夹中保存的所有 Parquet 文件

scala - 如何在不使我的整个应用程序不安全的情况下忽略 Play Framework WS SSL 证书?

elasticsearch - 什么是FilteredQueryDescriptor的替代品?

python - 如何扩展elasticsearch使其每秒可以索引大量文档?

search - 刷新请求与清空Elasticsearch缓存之间的区别

apache-spark - Spark 和 Hive 之间的 Derby 版本不匹配 : Unable to instantiate org. apache.hadoop.hive.metastore.HiveMetaStoreClient