java - 如何将 JSON 对象添加到 apache Spark 中的数据集/数据帧

标签 java apache-spark dataset

我想知道是否有可能使用 Spark Dataset API 创建自定义 JSON

或 Apache Spark 提供的任何其他功能。我知道我可以使用 join() 方法加入两个数据集,但我想创建自定义 JSON,其中数据集 2(即在我的情况下发出警报)将作为 JSON 对象添加到数据集 1(即库存)使用“ALERT”键。

Dataset<Row> inventory = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
                    .json("C:\\Users\\phyadavi\\LearningAndDevelopment\\\\CDXJSONMergeJob\\data1\\inventory.json");
Dataset<Row> alerts = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
                    .json("C:\\Users\\phyadavi\\LearningAndDevelopment\\\\CDXJSONMergeJob\\data1\\alert.json");

Dataset<Row> inventoryAlerts = inventory.join(alerts);
        inventoryAlerts.printSchema();

库存和警报的架构如下。

root
 |-- Equipment: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- collectedPid: string (nullable = true)
 |    |    |    |-- collectedSerialNum: string (nullable = true)
 |    |    |    |-- containingHwId: string (nullable = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |    |    |-- hwId: string (nullable = true)
 |    |    |    |-- items: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- tagName: string (nullable = true)
 |    |    |    |    |    |-- tagValue: string (nullable = true)
 |    |    |    |-- pceMultiPid: string (nullable = true)
 |    |    |    |-- pcePhyiscalType: string (nullable = true)
 |    |    |    |-- pcePid: string (nullable = true)
 |    |    |    |-- pceProductDescription: string (nullable = true)
 |    |    |    |-- pceProductFamily: string (nullable = true)
 |    |    |    |-- pceProductType: string (nullable = true)
 |    |    |    |-- pceRuleId: string (nullable = true)
 |    |    |    |-- productDescription: string (nullable = true)
 |    |    |    |-- productFamily: string (nullable = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |    |    |-- productType: string (nullable = true)
 |    |    |    |-- serialNumber: string (nullable = true)
 |    |    |    |-- snasItemType: string (nullable = true)
 |    |    |    |-- snasProductFamily: string (nullable = true)
 |    |    |    |-- snasSerialNumber: string (nullable = true)
 |    |    |    |-- snasValidationCode: string (nullable = true)
 |    |    |    |-- snasValidationSource: string (nullable = true)
 |-- LicenseActivated: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- count: long (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |-- NetworkElement: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- hostname: string (nullable = true)
 |    |    |    |-- ipAddress: string (nullable = true)
 |    |    |    |-- isManagedNe: boolean (nullable = true)
 |    |    |    |-- items: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- tagName: string (nullable = true)
 |    |    |    |    |    |-- tagValue: string (nullable = true)
 |    |    |    |-- lastUpdateDate: long (nullable = true)
 |    |    |    |-- managedNeId: string (nullable = true)
 |    |    |    |-- managementAddress: string (nullable = true)
 |    |    |    |-- neId: string (nullable = true)
 |    |    |    |-- neName: string (nullable = true)
 |    |    |    |-- neRegistrationStatus: string (nullable = true)
 |    |    |    |-- productFamily: string (nullable = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |    |    |-- productType: string (nullable = true)
 |    |    |    |-- serialNumber: string (nullable = true)
 |    |    |    |-- smartLicenseProductInstanceIdentifier: string (nullable = true)
 |    |    |    |-- smartLicenseVirtualAccountName: string (nullable = true)
 |    |    |    |-- softwareType: string (nullable = true)
 |    |    |    |-- softwareVersion: string (nullable = true)
 |    |    |    |-- systemUptime: long (nullable = true)
 |    |    |    |-- udiProductIdentifier: string (nullable = true)
 |-- Versions: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- lastUpdated: long (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- version: string (nullable = true)
 |-- collectorId: string (nullable = true)
 |-- generatedAt: long (nullable = true)
 |-- managedNeId: string (nullable = true)
 |-- partyId: string (nullable = true)
 |-- recordType: string (nullable = true)
 |-- sourceNeId: string (nullable = true)
 |-- sourcePartyId: string (nullable = true)
 |-- sourceSubPartyId: string (nullable = true)
 |-- wfid: string (nullable = true)

#####################################
root
 |-- collectorId: string (nullable = true)
 |-- generatedAt: long (nullable = true)
 |-- managedNeId: string (nullable = true)
 |-- neAlert: struct (nullable = true)
 |    |-- advisory: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |    |    |-- headlineName: string (nullable = true)
 |    |    |    |-- hwId: string (nullable = true)
 |    |    |    |-- neId: string (nullable = true)
 |    |    |    |-- productFamily: string (nullable = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |    |    |-- psirtId: long (nullable = true)
 |    |    |    |-- publicReleaseInd: string (nullable = true)
 |    |    |    |-- softwareType: string (nullable = true)
 |    |    |    |-- softwareVersion: string (nullable = true)
 |    |    |    |-- vulnerabilityReason: string (nullable = true)
 |    |    |    |-- vulnerabilityStatus: string (nullable = true)
 |    |-- fieldNotice: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- caveat: string (nullable = true)
 |    |    |    |-- distributionCode: string (nullable = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |    |    |-- fieldNoticeId: long (nullable = true)
 |    |    |    |-- fieldNoticeName: string (nullable = true)
 |    |    |    |-- hwId: string (nullable = true)
 |    |    |    |-- neId: string (nullable = true)
 |    |    |    |-- productFamily: string (nullable = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |    |    |-- serialNumber: string (nullable = true)
 |    |    |    |-- softwareType: string (nullable = true)
 |    |    |    |-- vulnerabilityReason: string (nullable = true)
 |    |    |    |-- vulnerabilityStatus: string (nullable = true)
 |    |-- hwEoX: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- bulletinName: string (nullable = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |    |    |-- hardwareEoXId: long (nullable = true)
 |    |    |    |-- hwId: string (nullable = true)
 |    |    |    |-- neId: string (nullable = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |-- swEoX: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- bulletinHeadline: string (nullable = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |    |    |-- neId: string (nullable = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |    |    |-- softwareEoXId: long (nullable = true)
 |    |    |    |-- softwareType: string (nullable = true)
 |    |    |    |-- softwareVersion: string (nullable = true)
 |-- partyId: string (nullable = true)
 |-- recordType: string (nullable = true)
 |-- sourceNeId: string (nullable = true)
 |-- sourcePartyId: string (nullable = true)
 |-- sourceSubPartyId: string (nullable = true)
 |-- wfid: string (nullable = true)

最佳答案

如果您想要连接并将一个数据集中的字段保留为嵌套,您可以使用 struct 创建一个 StructType 列并按如下方式连接

import org.apache.spark.sql.functions.udf

Dataset<Row> inventory = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
                    .json("path to json inventory");
Dataset<Row> alerts = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
                           .json("path to alerts json")
                           .select($"partyId", struct("columns").as("ALERTS"));
//column names are all the columns that you want in nested fiels with comma separated

Dataset<Row> inventoryAlerts = inventory.join(alerts);
        inventoryAlerts.printSchema();

这应该在加入之后为您提供所需的架构

关于java - 如何将 JSON 对象添加到 apache Spark 中的数据集/数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49576171/

相关文章:

java - 使用双引号生成的字符串未在字符串池中生成

java.lang.AbstractMethodError : org. apache.xerces.dom.ElementImpl.getTextContent()Ljava/lang/字符串

c# - 更新数据表中的单元格

c# - 我们可以将数据集传递给 Web 服务方法吗?如果是,那么如何?

c# - C# 排序后删除给定行索引的行

java - session 超时配置不起作用?

java - 如何从菜单处理程序重新实例化零件类?

scala - 使用 scala 2.12.10 时 Spark 的 pom.xml 依赖项

apache-spark - Pyspark Dataframe 中的 Cache()

scala - 从数据框中选择和处理数据的最有效方法