scala - 使用Spark将非规范化的Hive表加载到Elasticsearch中

标签 scala apache-spark elasticsearch hive apache-spark-sql

因此,我发现相反的答案很多,但事实并非如此。现在听起来很愚蠢,因为Elasticsearch只能处理非规范化数据,但这是我们遇到的问题。我们有一个表,其格式如下:

+----+--------+--------+--------+--------+---------+
| id | attr_1 | attr_2 | attr_3 | attr_4 | fst_nm  |
+----+--------+--------+--------+--------+---------+
|  1 |   2984 |   0324 |  38432 |        | john    |
|  2 |   2343 |  28347 | 238493 |  34923 | patrick |
|  3 |   3293 |   3823 |  38423 |  34823 | george  |
+----+--------+--------+--------+--------+---------+

当attr_x代表相同的事物时,假设当该表在规范化的世界中分开时,它们是另一个表的外键。因此,所有attrs都存在于单独的表中。但是,这些表已被删除,它们都被丢弃到一个长表中。通常,装入Elasticsearch并不是一个大问题,但是此表非常庞大,大约有1000多个列。我们想要获取这些attrs并将其作为数组存储在Elasticsearch中,如下所示:
_source: {
  "id": 1,
  "fst_nm": "john",
  "attrs": [
    2984,
    0324,
    38432
  ]
}

代替:
_source: {
  "id": 1,
  "fst_nm": "john",
  "attr_1": 2984,
  "attr_2": 0324,
  "attr_3": 38432
}

当我们使用默认的Spark流程时,它仅创建底部的Elasticsearch文档。我曾经想过要创建一个新的attrs表并将其取消透视,然后按ID查询该表以获取attrs,因此它看起来像这样:
+-----+--------+
| id  |  attr  |
+-----+--------+
|   1 |   2984 |
|   1 |   0324 |
|   1 |  38432 |
|   2 |   2343 |
| ... |    ... |
|   3 |  34823 |
+-----+--------+

然后,我们可以使用Spark SQL在此新创建的表上通过id查询,获取attrs,但是如何使用Spark将其作为数组插入到Elasticsearch中呢?

我的另一个想法是在Hive中创建一个新表,并将attrs更改为Hive复杂类型的数组,但是我不知道该怎么做。另外,如果我们使用Spark在Hive中查询表,当结果以数组形式返回时,是否容易转储到Elasticsearch中?

最佳答案

至于数据转换部分,可以使用array将几列作为数组收集到一列中,然后可以使用.write.json("jsonfile")写入json文件:

import org.apache.spark.sql.functions.col
val attrs = df.columns.filter(_.startsWith("attr")).map(col(_))

val df_array = df.withColumn("attrs", array(attrs:_*)).select("id", "fst_nm", "attrs")

df_array.toJSON.collect
//res8: Array[String] = Array({"id":1,"fst_nm":"john","attrs":[2984,324,38432,null]}, {"id":2,"fst_nm":"patrick","attrs":[2343,28347,238493,34923]})

写入文件:
df_array.write.json("/PATH/TO/jsonfile")

关于scala - 使用Spark将非规范化的Hive表加载到Elasticsearch中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45267472/

相关文章:

elasticsearch - 弹性休息高级API RolloverRequest

ubuntu - 为什么 elasticsearch 不能在 Ubuntu 14.04 上运行?

elasticsearch - Elasticsearch-将字符串拆分为不带空格或特殊字符的常见单词

mysql - 在 ScalikeJDBC 中使用 SQLSyntaxSupport 时找不到列名称

scala - 使用scalaz时如何为 bool 创建半组?

hadoop - 有没有办法让 Spark 在不使用 Hadoop 的情况下读取 AWS S3 文件?

scala - 迭代 org.apache.spark.sql.Row

Scala:根据参数返回不同的类型 - 无需强制转换

scala - 如何在 Play Framework 2 中将可选参数传递给 Scala 模板

eclipse - 无法禁用 Spark eclipse scala 日志