java - 在 Spark Streaming Java 中提取嵌套 JSON 值

标签 java apache-spark spark-streaming spark-streaming-kafka

如何在 Spark Streaming 中解析来自 Kafka 的 json 消息? 我正在将 JavaRDD 转换为数据集,并从那里提取值。发现提取值成功,但是我无法提取嵌套的 json 值,例如“host.name”和“fields.type”。

来自 Kafka 的传入消息:

{
  "@timestamp": "2020-03-03T10:48:03.160Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "7.6.0"
  },
  "host": {
    "name": "test.com"
  },
  "agent": {
    "id": "7651453414",
    "version": "7.6.0",
    "type": "filebeat",
    "ephemeral_id": "71983698531",
    "hostname": "test"
  },
  "message": "testing",
  "log": {
    "file": {
      "path": "/test.log"
    },
    "offset": 250553
  },
  "input": {
    "type": "log"
  },
  "fields": {
    "type": "test"
  },
  "ecs": {
    "version": "1.4.0"
  }
}

Spark 代码:

StructField[] structFields = new StructField[] {
            new StructField("message", DataTypes.StringType, true, Metadata.empty()) };
StructType structType = new StructType(structFields);

StructField[] structFields2 = new StructField[] {
            new StructField("host", DataTypes.StringType, true, Metadata.empty()),
            new StructField("fields", DataTypes.StringType, true, Metadata.empty()),
            new StructField("message", DataTypes.StringType, true, Metadata.empty()) };
StructType structType2 = new StructType(structFields2);

JavaRDD<Row> rowRDD = rdd.map(new Function<ConsumerRecord<String, String>, Row>() {
        /**
         * 
         */
        private static final long serialVersionUID = -8817714250698168398L;

        @Override
        public Row call(ConsumerRecord<String, String> r) {
            Row row = RowFactory.create(r.value());
            return row;
        }
    });
    Dataset<Row> rowExtracted = spark.createDataFrame(rowRDD.rdd(), structType)
            .select(functions.from_json(functions.col("message"), structType2).as("data")).select("data.*");
    rowExtracted.printSchema();
    rowExtracted.show((int) rowExtracted.count(), false);

打印架构:

root
 |-- host: string (nullable = true)
 |-- fields: string (nullable = true)
 |-- message: string (nullable = true)

实际输出:

+---------------+---------------+-------+
|host           |fields         |message|
+---------------+---------------+-------+
|{"name":"test"}|{"type":"test"}|testing|
+---------------+---------------+-------+

预期输出:

+---------------+---------------+-------+
|host           |fields         |message|
+---------------+---------------+-------+
|test           |test           |testing|
+---------------+---------------+-------+

最佳答案

StructField[] structFieldsName = new StructField[] {
            new StructField("name", DataTypes.StringType, true, Metadata.empty())
            };
StructType structTypeName = new StructType(structFieldsName);


StructField[] structFieldsType = new StructField[] {
            new StructField("type", DataTypes.StringType, true, Metadata.empty())
            };
StructType structTypeNested = new StructType(structFieldsType);

StructField[] structFieldsMsg = new StructField[] {
            new StructField("host", structTypeName , true, Metadata.empty()),
            new StructField("fields", structTypeNested, true, Metadata.empty()),
            new StructField("message", DataTypes.StringType, true, Metadata.empty())
            };
StructType structTypeMsg = new StructType(structFieldsMsg);

Dataset<Row> rowExtracted = spark.createDataFrame(rowRDD.rdd(), structTypeMsg)

关于java - 在 Spark Streaming Java 中提取嵌套 JSON 值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60519018/

相关文章:

Azure Synapse Spark SQL池无法创建数据库

apache-spark - 任何人都尝试使用 spark structured streaming 将数据流式传输到 Redshift

scala - 如何在 Windows 8.1 上启动 Spark Shell?

scala - 我们如何从结构化流中获得小批量时间

Java 相当于 NetworkCredential

java - 为什么 Spring @Value 与 @Controller 不兼容?

apache-spark - 如何处理 "WARN TaskSetManager: Stage contains a task of very large size"?

java - 无法使用 spark (java) 从 Cassandra 获取数据

java - Twitter,获取过去 24 小时的所有推文

java - 变量未初始化,Void 方法无法返回值,无法解析为变量