hadoop - 在具有 JSON 数据的 Parquet 文件上创建 Hive 表

标签 hadoop apache-spark hive hiveql parquet

我要达到的目标

  1. 从源大 JSON 文件中获取数据 (employee-sample.json)
  2. 一个简单的 spark 应用程序,将其作为文本文件读取并存储在 parquet (simple-loader.java) 中。我不知道 JSON 文件中有什么,所以我不能放置任何模式,所以我想要读取模式,而不是写入模式。一个包含一列名为“值”的 Parquet 文件,其中包含已创建的 JSON 字符串
  3. 在 parquet 文件上创建一个 HIVE 外部表,当我执行“select * from table”时,我看到一列带有 JSON 数据。

我真正需要的是创建一个 HIVE 表,它可以读取“值”列中的 JSON 数据并应用架构和发出列,这样我就可以根据需要在我的 RAW 数据上创建各种表。

我已经在 J​​SON 文件上创建了配置单元表,并提取了列,但是这个从 parquet 中提取列并应用 JSON 模式在欺骗我

employee-sample.json

{"name":"Dave", "age" : 30 , "DOB":"1987-01-01"}
{"name":"Steve", "age" : 31 , "DOB":"1986-01-01"}
{"name":"Kumar", "age" : 32 , "DOB":"1985-01-01"}

将 JSON 转换为 parquet 的简单 Spark 代码

simple-loader.java

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder()
            .appName(JsonToParquet.class.getName())
            .master("local[*]").getOrCreate();
    Dataset<String> eventsDataSet = sparkSession.read().textFile("D:\\dev\\employee-sample.json");
    eventsDataSet.createOrReplaceTempView("rawView");
    sparkSession.sqlContext().sql("select string(value) as value from rawView")
            .write()
            .parquet("D:\\dev\\" + UUID.randomUUID().toString());
    sparkSession.close();
}

Parquet 文件上的 hive 表

CREATE EXTERNAL TABLE EVENTS_RAW (
VALUE STRING)
STORED AS PARQUET
LOCATION 'hdfs://XXXXXX:8020/employee/data_raw';

我尝试通过设置 JSON serde,但只有当数据存储在 JSON foram ROW FORMAT SERDE 'com.proofpoint.hive.serde.JsonSerde' 中时它才有效

预期格式

CREATE EXTERNAL TABLE EVENTS_DATA (
    NAME STRING,
    AGE STRING,
    DOB STRING)
??????????????????????????????

最佳答案

创建hive外部表示例:

 public static final String CREATE_EXTERNAL = "CREATE EXTERNAL TABLE %s" +
        " (%s) " +
        " PARTITIONED BY(%s) " +
        " STORED AS %s" +
        " LOCATION '%s'";
/**
 * Will create an external table and recover the partitions
 */
public void createExternalTable(SparkSession sparkSession, StructType schema, String tableName, SparkFormat format, List<StructField> partitions, String tablePath){
    String createQuery = createTableString(schema, tableName, format, partitions, tablePath);
    logger.info("Going to create External table with the following query:\n " + createQuery);
    sparkSession.sql(createQuery);
    logger.debug("Finish to create External table with the following query:\n " + createQuery);
    recoverPartitions(sparkSession, tableName);
}

public String createTableString(StructType schema, String tableName, SparkFormat format, List<StructField> partitions, String tablePath){
    Set<String> partitionNames = partitions.stream().map(struct -> struct.name()).collect(Collectors.toSet());
    String columns = Arrays.stream(schema.fields())
            //Filter the partitions
            .filter(field -> !partitionNames.contains(field.name()))
            //
            .map(HiveTableHelper::fieldToStringBuilder)
            .collect(Collectors.joining(", "));

    String partitionsString = partitions.stream().map(HiveTableHelper::fieldToStringBuilder).collect(Collectors.joining(", "));

    return String.format(CREATE_EXTERNAL, tableName, columns, partitionsString, format.name(), tablePath);
}

/**
 *
 * @param sparkSession
 * @param table
 */
public void recoverPartitions(SparkSession sparkSession, String table){
    String query = "ALTER TABLE " + table + " RECOVER PARTITIONS";
    logger.debug("Start: " + query);
    sparkSession.sql(query);
    sparkSession.catalog().refreshTable(table);
    logger.debug("Finish: " + query);
}

  public static StringBuilder fieldToStringBuilder(StructField field){
    StringBuilder sb = new StringBuilder();
    sb.append(field.name()).append( " ").append(field.dataType().simpleString());
    return sb;
}

关于hadoop - 在具有 JSON 数据的 Parquet 文件上创建 Hive 表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47818707/

相关文章:

hadoop - Sqoop大数据:如何使用sqoop导入带有逗号的地址字段?

scala - 如何使用 Scala 运行具有分类特征集的 Spark 决策树?

scala - 如何根据user_id分区的其他列值计算行的差异

apache - Apache Ranger检查安装的版本

hadoop - hadoop流错误,使用python mapreduce

scala - 如何在 Spark 中使用 BLAS 库?

sql - 在配置单元中拆分列

azure - 适用于 Hive 的 Microsoft ODBC 驱动程序

hadoop - hive :Concat map

hadoop - 从 org.apache.hadoop.hive.ql.exec.DDLTask 创建配置单元表 : FAILED: Execution Error, 返回代码 1 时出错。元异常