我想知道是否有可能使用 Spark Dataset API 创建自定义 JSON
或 Apache Spark 提供的任何其他功能。我知道我可以使用 join()
方法加入两个数据集,但我想创建自定义 JSON,其中数据集 2(即在我的情况下发出警报)将作为 JSON 对象添加到数据集 1(即库存)使用“ALERT”键。
Dataset<Row> inventory = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("C:\\Users\\phyadavi\\LearningAndDevelopment\\\\CDXJSONMergeJob\\data1\\inventory.json");
Dataset<Row> alerts = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("C:\\Users\\phyadavi\\LearningAndDevelopment\\\\CDXJSONMergeJob\\data1\\alert.json");
Dataset<Row> inventoryAlerts = inventory.join(alerts);
inventoryAlerts.printSchema();
库存和警报的架构如下。
root
|-- Equipment: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- collectedPid: string (nullable = true)
| | | |-- collectedSerialNum: string (nullable = true)
| | | |-- containingHwId: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- hwId: string (nullable = true)
| | | |-- items: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- tagName: string (nullable = true)
| | | | | |-- tagValue: string (nullable = true)
| | | |-- pceMultiPid: string (nullable = true)
| | | |-- pcePhyiscalType: string (nullable = true)
| | | |-- pcePid: string (nullable = true)
| | | |-- pceProductDescription: string (nullable = true)
| | | |-- pceProductFamily: string (nullable = true)
| | | |-- pceProductType: string (nullable = true)
| | | |-- pceRuleId: string (nullable = true)
| | | |-- productDescription: string (nullable = true)
| | | |-- productFamily: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- productType: string (nullable = true)
| | | |-- serialNumber: string (nullable = true)
| | | |-- snasItemType: string (nullable = true)
| | | |-- snasProductFamily: string (nullable = true)
| | | |-- snasSerialNumber: string (nullable = true)
| | | |-- snasValidationCode: string (nullable = true)
| | | |-- snasValidationSource: string (nullable = true)
|-- LicenseActivated: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- count: long (nullable = true)
| | | |-- type: string (nullable = true)
|-- NetworkElement: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- hostname: string (nullable = true)
| | | |-- ipAddress: string (nullable = true)
| | | |-- isManagedNe: boolean (nullable = true)
| | | |-- items: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- tagName: string (nullable = true)
| | | | | |-- tagValue: string (nullable = true)
| | | |-- lastUpdateDate: long (nullable = true)
| | | |-- managedNeId: string (nullable = true)
| | | |-- managementAddress: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- neName: string (nullable = true)
| | | |-- neRegistrationStatus: string (nullable = true)
| | | |-- productFamily: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- productType: string (nullable = true)
| | | |-- serialNumber: string (nullable = true)
| | | |-- smartLicenseProductInstanceIdentifier: string (nullable = true)
| | | |-- smartLicenseVirtualAccountName: string (nullable = true)
| | | |-- softwareType: string (nullable = true)
| | | |-- softwareVersion: string (nullable = true)
| | | |-- systemUptime: long (nullable = true)
| | | |-- udiProductIdentifier: string (nullable = true)
|-- Versions: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- lastUpdated: long (nullable = true)
| | | |-- type: string (nullable = true)
| | | |-- version: string (nullable = true)
|-- collectorId: string (nullable = true)
|-- generatedAt: long (nullable = true)
|-- managedNeId: string (nullable = true)
|-- partyId: string (nullable = true)
|-- recordType: string (nullable = true)
|-- sourceNeId: string (nullable = true)
|-- sourcePartyId: string (nullable = true)
|-- sourceSubPartyId: string (nullable = true)
|-- wfid: string (nullable = true)
#####################################
root
|-- collectorId: string (nullable = true)
|-- generatedAt: long (nullable = true)
|-- managedNeId: string (nullable = true)
|-- neAlert: struct (nullable = true)
| |-- advisory: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- headlineName: string (nullable = true)
| | | |-- hwId: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- productFamily: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- psirtId: long (nullable = true)
| | | |-- publicReleaseInd: string (nullable = true)
| | | |-- softwareType: string (nullable = true)
| | | |-- softwareVersion: string (nullable = true)
| | | |-- vulnerabilityReason: string (nullable = true)
| | | |-- vulnerabilityStatus: string (nullable = true)
| |-- fieldNotice: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- caveat: string (nullable = true)
| | | |-- distributionCode: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- fieldNoticeId: long (nullable = true)
| | | |-- fieldNoticeName: string (nullable = true)
| | | |-- hwId: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- productFamily: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- serialNumber: string (nullable = true)
| | | |-- softwareType: string (nullable = true)
| | | |-- vulnerabilityReason: string (nullable = true)
| | | |-- vulnerabilityStatus: string (nullable = true)
| |-- hwEoX: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- bulletinName: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- hardwareEoXId: long (nullable = true)
| | | |-- hwId: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- productId: string (nullable = true)
| |-- swEoX: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- bulletinHeadline: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- softwareEoXId: long (nullable = true)
| | | |-- softwareType: string (nullable = true)
| | | |-- softwareVersion: string (nullable = true)
|-- partyId: string (nullable = true)
|-- recordType: string (nullable = true)
|-- sourceNeId: string (nullable = true)
|-- sourcePartyId: string (nullable = true)
|-- sourceSubPartyId: string (nullable = true)
|-- wfid: string (nullable = true)
最佳答案
如果您想要连接并将一个数据集中的字段保留为嵌套,您可以使用 struct
创建一个 StructType 列并按如下方式连接
import org.apache.spark.sql.functions.udf
Dataset<Row> inventory = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("path to json inventory");
Dataset<Row> alerts = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("path to alerts json")
.select($"partyId", struct("columns").as("ALERTS"));
//column names are all the columns that you want in nested fiels with comma separated
Dataset<Row> inventoryAlerts = inventory.join(alerts);
inventoryAlerts.printSchema();
这应该在加入
之后为您提供所需的架构
。
关于java - 如何将 JSON 对象添加到 apache Spark 中的数据集/数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49576171/