apache-spark - 嵌套列上的 DataFrame partitionBy

标签 apache-spark apache-spark-sql spark-dataframe

我正在尝试在如下嵌套字段上调用 ​​partitionBy:

val rawJson = sqlContext.read.json(filename)
rawJson.write.partitionBy("data.dataDetails.name").parquet(filenameParquet)

运行时出现以下错误。我确实看到“名称”列为以下架构中的字段。是否有不同的格式来指定嵌套的列名?

java.lang.RuntimeException: Partition column data.dataDetails.name not found in schema StructType(StructField(name,StringType,true), StructField(time,StringType,true), StructField(data,StructType(StructField(dataDetails,StructType(StructField(name,StringType,true), StructField(id,StringType,true),true)),true))



这是我的 json 文件:
{  
  "name": "AssetName",
  "time": "2016-06-20T11:57:19.4941368-04:00",
  "data": {
    "type": "EventData",
    "dataDetails": {
      "name": "EventName"
      "id": "1234"

    }
  }
} 

最佳答案

这似乎是此处列出的已知问题:https://issues.apache.org/jira/browse/SPARK-18084

我也遇到了这个问题,为了解决这个问题,我能够在我的数据集上取消嵌套列。我的数据集与您的数据集略有不同,但这是策略...

原始 Json:

{  
  "name": "AssetName",
  "time": "2016-06-20T11:57:19.4941368-04:00",
  "data": {
    "type": "EventData",
    "dataDetails": {
      "name": "EventName"
      "id": "1234"

    }
  }
} 

修改后的 Json:
{  
  "name": "AssetName",
  "time": "2016-06-20T11:57:19.4941368-04:00",
  "data_type": "EventData",
  "data_dataDetails_name" : "EventName",
  "data_dataDetails_id": "1234"
  }
} 

获取修改后的 Json 的代码:
def main(args: Array[String]) {
  ...

  val data = df.select(children("data", df) ++ $"name" ++ $"time"): _*)

  data.printSchema

  data.write.partitionBy("data_dataDetails_name").format("csv").save(...)
}

def children(colname: String, df: DataFrame) = {
  val parent = df.schema.fields.filter(_.name == colname).head
  val fields = parent.dataType match {
    case x: StructType => x.fields
    case _ => Array.empty[StructField]
  }
  fields.map(x => col(s"$colname.${x.name}").alias(s"$colname" + s"_" + s"${x.name}"))
}

关于apache-spark - 嵌套列上的 DataFrame partitionBy,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38319540/

相关文章:

scala - 如何在spark中使用窗口函数过滤数据

hadoop - 配置Hadoop以使用启用S3请求者付款的功能

apache-spark - Pyspark UDF 在两列之间返回类似于 groupby().sum() 的结果

apache-spark - 从嵌套数组和结构 Spark 中提取值

apache-spark - SparkSQL/配置单元 : equivalent of MySQL's `information_schema.table.{data_length, table_rows}` ?

apache-spark - Spark - 如何使用 - Thrift - Hive Interactor 获取逻辑/物理查询执行

python - 如何在 IBM 的 Spark 服务上列出预装的 Python 包

apache-spark - 如何从Spark RDD中的特定分区获取数据?

scala - 给定 Elasticsearch 无效模式

apache-spark - Apache Spark 数据集 API : head(n:Int) vs take(n:Int)