java - 如何将包含 JSON 的输入 CSV 数据转换为 Spark 数据集?

标签 java apache-spark

我有一个包含以下数据的 CSV:

dept|emp_json|location
finance|{ "employee":[{"name":{"firstName":"John","lasteName":"Doe"},"address":{"street":"1234 West Broad St","unit":"8505","city":"Columbus"}},{"name":{"firstName":"Alex","lasteName":"Messi"},"address":{"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"}}]}|OH


我能够读取文件并创建数据集并提取 Json 列:

Dataset<Row> empDetlsDS = sparkSession.read().option("header", "true").option(delimiter, "|").schema(mySchema).csv(inputCSVPath);
Dataset<Row> empDetlsJsonDocDS = empDetlsDS.select(emp_json);

我想展平 JSON 并创建一个输出数据集,其中包含员工数组中存在的行数,格式如下:

dept    |emp_name   |emp_address              |emp_city|location  |
---------------------------------------------------------------
finance |John Doe   |1234 West Broad St 8505  |Columbus|OH        |
finance |Alex Messi |4321 North Meecham Rd 300|Salinas |OH        |
-------------------------------------------------------------------


如果有人对 Java 和 Spark 有任何建议,请提供帮助。提前致谢。

最佳答案

@tkkman 这是我所说的 scala 方式。 rdd 方式已被弃用,现在推荐使用 DataSet 方式,因此在 Java 中应该很简单

import spark.implicits._
import org.apache.spark.sql.functions._

val df = spark.read.option("delimiter","|").option("header","true").csv("/FileStore/tables/test.txt")

val jdf = spark.read.json(df.select("emp_json").rdd.map(_.toString)).select(explode($"employee").alias("emp"))
.select($"emp.name.firstName",$"emp.name.lasteName",$"emp.address.street",$"emp.address.unit",$"emp.address.city")

jdf.printSchema

jdf.withColumn("dept", lit("finance")).withColumn("city",lit("OH")).show(false)

+---------+---------+---------------------+----+----+-------+
|firstName|lasteName|street               |unit|city|dept   |
+---------+---------+---------------------+----+----+-------+
|John     |Doe      |1234 West Broad St   |8505|OH  |finance|
|Alex     |Messi    |4321 North Meecham Rd|300 |OH  |finance|
+---------+---------+---------------------+----+----+-------+

关于java - 如何将包含 JSON 的输入 CSV 数据转换为 Spark 数据集?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54430722/

相关文章:

hadoop - HBase 获取rowkey所在的值

java - 无法让 JUnit 测试失败(它应该失败)

java - 如何获取txt到数组的特定长度

apache-spark - Spark ML 随机森林和梯度增强树用于回归

scala - Spark yarn 返回退出代码未更新,因为 webUI 中失败 - spark 提交

apache-spark - 如何计算一天从 Kafka 主题中获取的消息数?

java - "your interface with the user can be via the command line"

java - 有没有办法在子类上实现一个方法,我可以从它的子类访问属性?

java - 如何在 ItestListener 中获取 WebElement

scala - Spark Scala 流式 CSV