java - 使用java读取spark sql中的复杂json

标签 java spark-streaming

我的json文件如下所示,我正在尝试使用以下代码读取majorsector_percent下的所有名称。

代码:

  JavaSQLContext sQLContext = new JavaSQLContext(sc);
    sQLContext.jsonFile("C:/Users/HimanshuK/Downloads/world_bank/world_bank.json").registerTempTable("logs");
    sQLContext.sqlContext().cacheTable("logs");
    List s = sQLContext.sql("select majorsector_percent from logs limit 1 ").map(row -> new Tuple2<>(row.getString(0), row.getString(1))).collect();


   JSON FIle 

     { "_id" : { "$oid" : "52b213b38594d8a2be17c780" }, "approvalfy" : 1999, "board_approval_month" : "November", "boardapprovaldate" : "2013-11-12T00:00:00Z", "borrower" : "FEDERAL DEMOCRATIC REPUBLIC OF ETHIOPIA", "closingdate" : "2018-07-07T00:00:00Z", "country_namecode" : "Federal Democratic Republic of Ethiopia!$!ET", "countrycode" : "ET", "countryname" : "Federal Democratic Republic of Ethiopia", "countryshortname" : "Ethiopia", "docty" : "Project Information Document,Indigenous Peoples Plan,Project Information Document", "envassesmentcategorycode" : "C", "grantamt" : 0, "ibrdcommamt" : 0, "id" : "P129828", "idacommamt" : 130000000, "impagency" : "MINISTRY OF EDUCATION", "lendinginstr" : "Investment Project Financing", "lendinginstrtype" : "IN", "lendprojectcost" : 550000000, "majorsector_percent" : [ { "Name" : "Education", "Percent" : 46 }, { "Name" : "Education", "Percent" : 26 }, { "Name" : "Public Administration, Law, and Justice", "Percent" : 16 }, { "Name" : "Education", "Percent" : 12 } ], "mjtheme" : [ "Human development" ], "mjtheme_namecode" : [ { "name" : "Human development", "code" : "8" }, { "name" : "", "code" : "11" } ], "mjthemecode" : "8,11", "prodline" : "PE", "prodlinetext" : "IBRD/IDA", "productlinetype" : "L", "project_abstract" : { "cdata" : "The development  }, "project_name" : "Ethiopia General Education Quality Improvement Project II",  "projectfinancialtype" : "IDA", "projectstatusdisplay" : "Active", "regionname" : "Africa", "sector1" : { "Name" : "Primary education", "Percent" : 46 }, "sector2" : { "Name" : "Secondary education", "Percent" : 26 }, "sector3" : { "Name" : "Public administration- Other social services", "Percent" : 16 }, "sector4" : { "Name" : "Tertiary education", "Percent" : 12 }, "sectorcode" : "ET,BS,ES,EP", "source" : "IBRD", "status" : "Active", "supplementprojectflg" : "N", "theme1" : { "Name" : "Education for all", "Percent" : 100 }, "themecode" : "65", "totalamt" : 130000000, "totalcommamt" : 130000000, "url" : "http://www.worldbank.org/projects/P129828/ethiopia-general-education-quality-improvement-project-ii?lang=en" }

但由于类型转换,我收到此错误,如何处理此类情况以及如何了解架构:

java.lang.ClassCastException:scala.collection.mutable.ArrayBuffer 无法转换为 java.lang.String

最佳答案

问题在于该查询的结果是包含数组的结构。当您尝试在数组上使用 row.getString(1) 映射结果时,会失败并抛出 CastException,因为相应的对象不是 String。

SQL 查询的结果是一个 DataFrame,您可以像这样请求架构(您可以在 Java API 中使用相同的命令):

scala> val data = sqlContext.sql("select majorsector_percent from logs limit 1 ")
data: org.apache.spark.sql.DataFrame = [majorsector_percent: array<struct<Name:string,Percent:bigint>>]

scala> data.printSchema
root
 |-- majorsector_percent: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Percent: long (nullable = true)

您可以从结果数据框中提取所需的信息,如下所示:

data.select("majorsector_percent.Name","majorsector_percent.Percent")

scala> data.select("majorsector_percent.Name","majorsector_percent.Percent").collect
res4: Array[org.apache.spark.sql.Row] = Array([WrappedArray(Education, Education, Public Administration, Law, and Justice, Education),WrappedArray(46, 26, 16, 12)])

或者您可以通过使用更具体的查询来简化流程:

val directQuery = sqlContext.sql("select majorsector_percent.Name, majorsector_percent.Percent from logs limit 1 ")
directQuery: org.apache.spark.sql.DataFrame = [Name: array<string>, Percent: array<bigint>]

scala> directQuery.collect
res5: Array[org.apache.spark.sql.Row] = Array([WrappedArray(Education, Education, Public Administration, Law, and Justice, Education),WrappedArray(46, 26, 16, 12)])

关于java - 使用java读取spark sql中的复杂json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36446560/

相关文章:

apache-spark - Spark 流 JavaCustomReceiver

java - 如何配置 YARN 在经过一段时间后重新启动 Spark 驱动程序?

java - 如何向单个 AWS EMR 集群提交多个 Spark 作业

java - 在类路径上查找通用接口(interface)的具体实现

java - 为什么这些表达式给我输出而不是编译错误?

apache-spark - UDF 原因警告 : CachedKafkaConsumer is not running in UninterruptibleThread (KAFKA-1894)

apache-spark - 在 Spark Streaming 中读取 Hbase 数据

java - Java中巨大的Json字符串转义的最佳方法是什么

java - 接口(interface)类型实例来自哪个类

java - 从包中读取文件