java - 获取 Spark 数据集中嵌套数组的最小值

标签 java json apache-spark

我有一个 JSON 服务器日志文件,我想使用 Spark 2.2.0 和 Java API 对其进行解析,我使用

将其转换为数据集
Dataset<Row> df = spark.read().json(args[0]);

然后,它生成以下架构:

df.printschema();

root
|-- timestamp: long (nullable = true)
|-- results: struct (nullable = true)
|    |-- entities: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- entity_id: string (nullable = true)
|    |    |    |-- score: long (nullable = true)
|    |    |    |-- is_available: boolean (nullable = true)
|    |-- number_of_results: long (nullable = true)

我想获取得分最低且可用的实体,因此我会得到类似于以下内容的数据集:

 root
 |-- timestamp: long (nullable = true)
 |-- results: struct (nullable = true)
 |    |-- entity: struct (containsNull = true)
 |    |    |-- entity_id: string (nullable = true)
 |    |    |-- score: long (nullable = true)
 |    |    |-- is_available: boolean (nullable = true)

我该如何进行这种转变?

最佳答案

您可以在数组列上应用用户定义的函数:

// Define the UDF that takes the min of array
UDF1<Seq<Row>, Row> getElement = seq -> {
    Row bestRow = null;
    long bestRowScore = Long.MAX_VALUE;
    for (Row r : JavaConversions.seqAsJavaList(seq)){
        if (r.getBoolean(1) && r.getLong(2)<bestRowScore){
            bestRow = r;
            bestRowScore = r.getLong(2);
        }
    }
    return bestRow;
};

// Define the return type of UDF
ArrayType arrayType = (ArrayType) df.select(df.col("results.entities")).schema().fields()[0].dataType();
DataType elementType = arrayType.elementType();

// Register UDF
sparkSession.udf().register("getElement", getElement, elementType);

// Apply UDF on dataset
Dataset<Row> transformedDF = df.select(df.col("timestamp"),functions.callUDF("getElement", df.col("results.entities")));
transformedDF.printSchema();

关于java - 获取 Spark 数据集中嵌套数组的最小值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45711368/

相关文章:

java - 比 XHTMLRenderer+iText 更有效的将 HTML 转换为 PDF 的方法

java - Jetty:套接字连接作为websocket连接

java - 如何设置 Spark 执行器的数量?

java - RDS 到 S3 - 数据转换 AWS

apache-spark - 如何将多行标签 xml 文件转换为 dataframe

java - 无法访问带注释的 Spring 服务

java - JDBC 附加程序的 Log4j2 MDC 配置

json - 如何在 Apps 脚本内循环 JSON 响应

处理 HTTPRequest(JSON) 时,iOS 应用程序有时会崩溃

java - 如何编写将 json 响应与场景大纲表进行比较的步骤定义