我有一个包含以下数据的 CSV:
dept|emp_json|location
finance|{ "employee":[{"name":{"firstName":"John","lasteName":"Doe"},"address":{"street":"1234 West Broad St","unit":"8505","city":"Columbus"}},{"name":{"firstName":"Alex","lasteName":"Messi"},"address":{"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"}}]}|OH
我能够读取文件并创建数据集并提取 Json 列:
Dataset<Row> empDetlsDS = sparkSession.read().option("header", "true").option(delimiter, "|").schema(mySchema).csv(inputCSVPath);
Dataset<Row> empDetlsJsonDocDS = empDetlsDS.select(emp_json);
我想展平 JSON 并创建一个输出数据集,其中包含员工数组中存在的行数,格式如下:
dept |emp_name |emp_address |emp_city|location |
---------------------------------------------------------------
finance |John Doe |1234 West Broad St 8505 |Columbus|OH |
finance |Alex Messi |4321 North Meecham Rd 300|Salinas |OH |
-------------------------------------------------------------------
如果有人对 Java 和 Spark 有任何建议,请提供帮助。提前致谢。
最佳答案
@tkkman 这是我所说的 scala 方式。 rdd 方式已被弃用,现在推荐使用 DataSet 方式,因此在 Java 中应该很简单
import spark.implicits._
import org.apache.spark.sql.functions._
val df = spark.read.option("delimiter","|").option("header","true").csv("/FileStore/tables/test.txt")
val jdf = spark.read.json(df.select("emp_json").rdd.map(_.toString)).select(explode($"employee").alias("emp"))
.select($"emp.name.firstName",$"emp.name.lasteName",$"emp.address.street",$"emp.address.unit",$"emp.address.city")
jdf.printSchema
jdf.withColumn("dept", lit("finance")).withColumn("city",lit("OH")).show(false)
+---------+---------+---------------------+----+----+-------+
|firstName|lasteName|street |unit|city|dept |
+---------+---------+---------------------+----+----+-------+
|John |Doe |1234 West Broad St |8505|OH |finance|
|Alex |Messi |4321 North Meecham Rd|300 |OH |finance|
+---------+---------+---------------------+----+----+-------+
关于java - 如何将包含 JSON 的输入 CSV 数据转换为 Spark 数据集?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54430722/