我有一个 JSON 服务器日志文件,我想使用 Spark 2.2.0 和 Java API 对其进行解析,我使用
将其转换为数据集Dataset<Row> df = spark.read().json(args[0]);
然后,它生成以下架构:
df.printschema();
root
|-- timestamp: long (nullable = true)
|-- results: struct (nullable = true)
| |-- entities: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- entity_id: string (nullable = true)
| | | |-- score: long (nullable = true)
| | | |-- is_available: boolean (nullable = true)
| |-- number_of_results: long (nullable = true)
我想获取得分最低且可用的实体,因此我会得到类似于以下内容的数据集:
root
|-- timestamp: long (nullable = true)
|-- results: struct (nullable = true)
| |-- entity: struct (containsNull = true)
| | |-- entity_id: string (nullable = true)
| | |-- score: long (nullable = true)
| | |-- is_available: boolean (nullable = true)
我该如何进行这种转变?
最佳答案
您可以在数组列上应用用户定义的函数:
// Define the UDF that takes the min of array
UDF1<Seq<Row>, Row> getElement = seq -> {
Row bestRow = null;
long bestRowScore = Long.MAX_VALUE;
for (Row r : JavaConversions.seqAsJavaList(seq)){
if (r.getBoolean(1) && r.getLong(2)<bestRowScore){
bestRow = r;
bestRowScore = r.getLong(2);
}
}
return bestRow;
};
// Define the return type of UDF
ArrayType arrayType = (ArrayType) df.select(df.col("results.entities")).schema().fields()[0].dataType();
DataType elementType = arrayType.elementType();
// Register UDF
sparkSession.udf().register("getElement", getElement, elementType);
// Apply UDF on dataset
Dataset<Row> transformedDF = df.select(df.col("timestamp"),functions.callUDF("getElement", df.col("results.entities")));
transformedDF.printSchema();
关于java - 获取 Spark 数据集中嵌套数组的最小值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45711368/