找到了一些关于 Scala 的想法,但无法在 Java 中成功实现,因此作为一个新问题发布。
我需要格式化来自 Kafka 主题的“值”列中的输入 JSON
Dataset<Row> output = df.select(functions.from_json(df.col("value"), schema));
StructType schema = new StructType();
schema.add("Id", DataTypes.StringType);
schema.add("Type", DataTypes.StringType);
schema.add("KEY", DataTypes.StringType);
schema.add("condition", DataTypes.IntegerType);
schema.add("seller_Id", DataTypes.IntegerType);
schema.add("seller_Name", DataTypes.StringType);
schema.add("isActive", DataTypes.BooleanType);
到达可以看到下面打印在控制台水槽上的点 -
StreamingQuery query = output.writeStream().format("console").start();
+-------------------------+
| jsontostructs(value)|
+-------------------------+
| [] |
+-------------------------+
请告知如何从此结构中获取各个列。
最佳答案
所以基本上需要结合使用“from_json”函数和schema.json()函数来获取String schema(类似于上面Filip在scala中提到的)。希望对某人有所帮助。
StructType schema = new StructType();
schema.add("Id", DataTypes.StringType);
schema.add("Type", DataTypes.StringType);
schema.add("KEY", DataTypes.StringType);
schema.add("condition", DataTypes.IntegerType);
schema.add("seller_Id", DataTypes.IntegerType);
schema.add("seller_Name", DataTypes.StringType);
schema.add("isActive", DataTypes.BooleanType);
Dataset<Row> output = df.select(from_json(df.col("value"), DataType.fromjson(schema.json())).as("data")).select("data.*");
最后的选择会将结构展平到直接在架构下定义的字段中。
关于java - 在 JAVA 中使用 Kafka JSON 输入格式进行 Spark 结构化流式传输,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50112163/