java - 在 JAVA 中使用 Kafka JSON 输入格式进行 Spark 结构化流式传输

标签 java json spark-streaming spark-structured-streaming

找到了一些关于 Scala 的想法,但无法在 Java 中成功实现,因此作为一个新问题发布。

我需要格式化来自 Kafka 主题的“值”列中的输入 JSON

Dataset<Row> output = df.select(functions.from_json(df.col("value"), schema));

StructType schema = new StructType();
schema.add("Id", DataTypes.StringType);
schema.add("Type", DataTypes.StringType);
schema.add("KEY", DataTypes.StringType);
schema.add("condition", DataTypes.IntegerType);
schema.add("seller_Id", DataTypes.IntegerType);
schema.add("seller_Name", DataTypes.StringType);
schema.add("isActive", DataTypes.BooleanType);

到达可以看到下面打印在控制台水槽上的点 -

StreamingQuery query = output.writeStream().format("console").start();

+-------------------------+ 
|     jsontostructs(value)|
+-------------------------+
|                    []   |
+-------------------------+

请告知如何从此结构中获取各个列。

最佳答案

所以基本上需要结合使用“from_json”函数和schema.json()函数来获取String schema(类似于上面Filip在scala中提到的)。希望对某人有所帮助。

StructType schema = new StructType();
schema.add("Id", DataTypes.StringType);
schema.add("Type", DataTypes.StringType);
schema.add("KEY", DataTypes.StringType);
schema.add("condition", DataTypes.IntegerType);
schema.add("seller_Id", DataTypes.IntegerType);
schema.add("seller_Name", DataTypes.StringType);
schema.add("isActive", DataTypes.BooleanType);

Dataset<Row> output = df.select(from_json(df.col("value"), DataType.fromjson(schema.json())).as("data")).select("data.*");

最后的选择会将结构展平到直接在架构下定义的字段中。

关于java - 在 JAVA 中使用 Kafka JSON 输入格式进行 Spark 结构化流式传输,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50112163/

相关文章:

scala - NoClassDefFoundError : Could not initialize XXX class after deploying on spark standalone cluster

java - 如何从 Java (Android) 中的 XML 节点获取标记 'Name'

java - 如何在 XML 中引用 DTD 值的路径

java - 如何在 Spring Data Rest 中为实体自定义模式

java - 在 BroadcastReceiver 类中打开 hibernate 的 Android 设备

jquery - jqGrid - 网格列的 JSON 响应中的 "editable":"true"

amazon-web-services - Spark Streaming 使用 S3 与 Kinesis

apache-spark - 群组成员支持的协议(protocol)与现有成员的协议(protocol)不兼容

python - 为什么 Django 将 postgres JSONField 值作为字符串返回?

javascript - 使用 JSON 显示 html 链接