arrays - 如何使用 Spark SQL 正确分解 JSON 中的字段

标签 arrays json apache-spark pyspark

我正在努力使用spark.sql() 来提取数据以提高性能。但我有一个令人难以置信的嵌套 JSON,我无法从中获取数据。

以下是 JSON 的架构:

root
 |-- httpStatus: long (nullable = true)
 |-- httpStatusMessage: string (nullable = true)
 |-- response: struct (nullable = true)
 |    |-- body: struct (nullable = true)
 |    |    |-- dataProviders: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- dataProviderId: long (nullable = true)
 |    |    |    |    |-- drivers: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- driverFirstName: string (nullable = true)
 |    |    |    |    |    |    |-- driverId: long (nullable = true)
 |    |    |    |    |    |    |-- driverLastName: string (nullable = true)
 |    |    |    |    |    |    |-- driverRef: string (nullable = true)
 |    |    |    |    |    |    |-- totalDistance: double (nullable = true)
 |    |    |    |    |    |    |-- vehicles: array (nullable = true)
 |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |    |-- deviceId: long (nullable = true)
 |    |    |    |    |    |    |    |    |-- deviceRef: string (nullable = true)
 |    |    |    |    |    |    |    |    |-- trips: array (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |    |    |    |-- averageSpeed: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |-- tripDistanceTravelled: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |-- tripDuration: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |-- tripId: string (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |-- tripStart: struct (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |    |-- heading: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |    |-- latitude: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |    |-- longitude: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |    |-- mileage: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |    |-- speed: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |    |-- timestamp: string (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |-- tripStop: struct (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |    |-- heading: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |    |-- latitude: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |    |-- longitude: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |    |-- mileage: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |    |-- speed: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |    |    |-- timestamp: string (nullable = true)
 |    |    |    |    |    |    |    |    |-- vehicleId: long (nullable = true)
 |    |    |    |    |    |    |    |    |-- vehicleRef: string (nullable = true)
 |    |-- header: struct (nullable = true)
 |    |    |-- accelUnit: string (nullable = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- distanceUnit: string (nullable = true)
 |    |    |-- fleetId: long (nullable = true)
 |    |    |-- fleetName: string (nullable = true)
 |    |    |-- gpsUnit: string (nullable = true)
 |    |    |-- speedUnit: string (nullable = true)
 |-- timestamp: string (nullable = true)

我一直在尝试分解这些字段以到达最嵌套的字段,但我无法通过 arrayType

这是我的代码示例:

json_df = spark.read.json('/user/myuser/drivers_directory/driverRates.json')

json_df.printSchema()

json_df.show()
+----------+-----------------+--------------------+-------------------+
|httpStatus|httpStatusMessage|            response|          timestamp|
+----------+-----------------+--------------------+-------------------+
|       200|          success|[[[[14, [[Eric, 1...|2020-11-11T19:46:01|
+----------+-----------------+--------------------+-------------------+

body_df = json_df.select('response.*').show()

json_df.select('response.*').select('body.*').show()
+--------------------+
|       dataProviders|
+--------------------+
|[[14, [[Eric, 100...|
+--------------------+


json_df.select('response.*').select('body.*').select('dataProviders.dataProviderId').show()
+--------------+
|dataProviderId|
+--------------+
|          [14]|
+--------------+

但是,对每个字段执行此操作非常乏味,并且对性能而言非常糟糕。

我一直在尝试使用spark.sql()来获取所有内容,但我收到了基于StructTypearrayType的错误

想要类似的东西:

json_df.createOrReplaceTempView('driver_dictionary')

final_driver_df = spark.sql("""select
            , httpStatus as status
            , httpStatusMessage as message
            , timestamp as time
            from driver_dictionary
            lateral view explode(response) as r
            """)

我遇到的问题是试图分解主体及其下面的数据。使用横向 View 时出现 StructType 错误,使用横向 View 时出现 ArrayType 错误。我们将不胜感激一些帮助。

最佳答案

我正在寻找的是这样的:

drivers_exploded_df = spark.sql('''select
    httpStatus
    , httpStatusMessage
    , response.header.*
    , dataProviders.dataProviderId
    , drivers.driverId
    , drivers.driverRef
    , drivers.firstName
    , drivers.lastName
    , timestamp
    from drivers_explode
    lateral view outer explode (response.body.dataProviders) providers_tbl as dataProviders
    lateral view outer explode (dataProviders.drivers) dataProviders_drivers as drivers''')

关于arrays - 如何使用 Spark SQL 正确分解 JSON 中的字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64795032/

相关文章:

java - 将字符串拆分为单词数组以便再次调用

php - array_unique 对象?

javascript - lodash 数组 _includes

apache-spark - Spark2无法将数据帧写入 Parquet hive 表: HiveFileFormat`.与指定格式 `ParquetFileFormat`不匹配

arrays - Quicksort 算法的最坏情况

javascript - 在 JQuery $.when 调用中处理可变数量的延迟请求的更好方法?

java - 将 JSONObject 写入文件

javascript - JSON.parse() - 语法错误 : Expected end of stream at char 2

scala - Spark 抛出 java.util.NoSuchElementException : key not found: 67

java - 为什么自定义接收器会失败并显示 "NoClassDefFoundError: scala/runtime/AbstractPartialFunction"?