java - 使用 Java 中的 Apache Spark 解析带有数组的 Json 对象并映射到多个对

标签 java json apache-spark

我用谷歌搜索了一整天,但找不到直接答案,所以最终在这里发布了一个问题。

我有一个包含行分隔的 json 对象的文件:

{"device_id": "103b", "timestamp": 1436941050, "rooms": ["Office", "Foyer"]}
{"device_id": "103b", "timestamp": 1435677490, "rooms": ["Office", "Lab"]}
{"device_id": "103b", "timestamp": 1436673850, "rooms": ["Office", "Foyer"]}

我的目标是使用 Java 中的 Apache Spark 解析此文件。我引用了How to Parsing CSV or JSON File with Apache Spark到目前为止,我可以使用 Gson 成功地将每一行 json 解析为 JavaRDD .

JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> data = sc.textFile("fileName");
JavaRDD<JsonObject> records = data.map(new Function<String, JsonObject>() {
    public JsonObject call(String line) throws Exception {
        Gson gson = new Gson();
        JsonObject json = gson.fromJson(line, JsonObject.class);
        return json;
    }
});

我真正陷入困境的是我想反序列化“rooms”数组,以便它适合我的类Event

public class Event implements Serializable {
    public static final long serialVersionUID = 42L;
    private String deviceId;
    private int timestamp;
    private String room;
    // constructor , getters and setters 
}

换句话说,从这一行开始:

{"device_id": "103b", "timestamp": 1436941050, "rooms": ["Office", "Foyer"]}

我想在 Spark 中创建两个 Event 对象:

obj1: deviceId = "103b", timestamp = 1436941050, room = "Office"
obj2: deviceId = "103b", timestamp = 1436941050, room = "Foyer"

我做了一点搜索并尝试了flatMapVlue,但没有运气......它给了我一个错误......

JavaRDD<Event> events = records.flatMapValue(new Function<JsonObject, Iterable<Event>>() {
    public Iterable<Event> call(JsonObject json) throws Exception {
        JsonArray rooms = json.get("rooms").getAsJsonArray();
        List<Event> data = new LinkedList<Event>();
        for (JsonElement room : rooms) {
            data.add(new Event(json.get("device_id").getAsString(), json.get("timestamp").getAsInt(), room.toString()));
        }
        return data;
    }
});

我对 Spark 和 Map/Reduce 非常陌生。如果您能帮助我,我将不胜感激。提前致谢!

最佳答案

如果将 json 数据加载到 DataFrame 中:

DataFrame df = sqlContext.read().json("/path/to/json");

您可以通过explode轻松完成此操作。

df.select(
    df.col("device_id"),
    df.col("timestamp"),
    org.apache.spark.sql.functions.explode(df.col("rooms")).as("room")
);

对于输入:

{"device_id": "1", "timestamp": 1436941050, "rooms": ["Office", "Foyer"]}
{"device_id": "2", "timestamp": 1435677490, "rooms": ["Office", "Lab"]}
{"device_id": "3", "timestamp": 1436673850, "rooms": ["Office", "Foyer"]}

您将得到:

+---------+------+----------+
|device_id|  room| timestamp|
+---------+------+----------+
|        1|Office|1436941050|
|        1| Foyer|1436941050|
|        2|Office|1435677490|
|        2|   Lab|1435677490|
|        3|Office|1436673850|
|        3| Foyer|1436673850|
+---------+------+----------+

关于java - 使用 Java 中的 Apache Spark 解析带有数组的 Json 对象并映射到多个对,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38345514/

相关文章:

java - 是-XX :MaxRAMFraction=1 safe for production in a containered environment?

java - 未调用 Cxf 异常映射器

java - 什么是 JUnit @Before 和 @Test

sql-server - 将两列转换为键值 json 对象?

pandas - Pyspark、Spark SQL 和 toPandas().shape 中计数不一致的原因是什么?

java - Spring Controller 不响应 https

ios - 检索JSON字符串中的破折号(-)之前和之后的数据?

Android:如何显示JSON Post结果

hadoop - 如何在 Hadoop 中创建从 1..N 开始的迭代器?

scala explode 方法 Cartesian product multiple array