json - 结构化流如何动态解析kafka的json数据

标签 json apache-spark spark-structured-streaming

我正在尝试使用结构化流从 Kafka 读取数据。从kafka接收到的数据为json格式。
我的代码如下:
在代码中,我使用 from_json 函数将 json 转换为数据帧以进行进一步处理。

val **schema**: StructType = new StructType()
    .add("time", LongType)
    .add(id", LongType)
    .add("properties",new StructType()
      .add("$app_version", StringType)
      .
      .
    )
val df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","...")
    .option("subscribe","...")
    .load()
    .selectExpr("CAST(value AS STRING) as value")
    .select(from_json(col("value"), **schema**))

我的问题是,如果字段增加,
我无法停止 spark 程序手动添加这些字段,
那么如何动态解析这些字段,我试过 schema_of_json() ,
只能用第一行来推断字段类型,不适用于多级嵌套结构的json数据。

最佳答案

My problem is that if the field is increased, I can't stop the spark program to manually add these fields, then how can I parse these fields dynamically



开箱即用的 Spark Structured Streaming(甚至 Spark SQL)是不可能的。不过有几个解决方案。

更改代码中的架构并恢复流式查询

您只需要停止流式查询,更改代码以匹配当前模式,然后恢复它。在 Spark Structured Streaming 中可以使用支持从检查点恢复的数据源。 Kafka 数据源确实支持它。

用户定义函数 (UDF)

您可以编写一个用户定义函数 (UDF) 来为您进行动态 JSON 解析。这也是最简单的选择之一。

新数据源(MicroBatchReader)

另一种选择是为内置的 Kafka 数据源创建一个扩展,它可以进行动态 JSON 解析(类似于 Kafka 反序列化器)。这需要更多的开发,但肯定是可行的。

关于json - 结构化流如何动态解析kafka的json数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58386798/

相关文章:

java - 同步retrofit中如何获取http响应状态2

java - 通过分割字符串创建 JSON?

json - JQuery、JSON、Spring MVC - 动态加载选择选项

java - Spark 结构化流中的 MQ 源

pyspark - 如何加载卡夫卡已经发布的所有记录?

java - 将 JSON 传递给 WebService

scala - 如何使用Spark/Scala展平集合?

scala - flatmap 是否比 filter+map 提供更好的性能?

scala - 使用 Spark hadoop API 创建 RDD 以访问 Cassandra DB

apache-spark - 触发流无法写入hdfs路径