java - 如何将 Kafka 数据源中的值转换为给定的模式?

标签 java apache-spark apache-spark-sql spark-structured-streaming

我通过以下代码从kafka服务器获取日志:

    Dataset<Row> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", mykey.Kafka_source)
            .option("subscribe", mykey.Kafka_topic)
            .load();

    Dataset<String> dg = df
            .selectExpr("CAST(value AS STRING)")
            .as(STRING());

然而,dg 的一个元素是这样的“姓名:John Doe,年龄:20”,但它只有一个键“值”。因此,当我将其保存在 HDFS 中时,它的保存方式如下:“值:”姓名:John Doe,年龄:22”。但是,我想像这样更改架构:

root  
|-- name: string (nullable = true)  
|-- age: string (nullable = true)  

这样元素就被保存为“姓名:John Doe,年龄:22”

当前元素的架构如下:

root  
|-- value: string (nullable = true)

我尝试编写代码将 dg 的每个元素转换为 Dataset 的新元素,但我认为 Java 中的结构化流不支持高级函数表达式。我怎样才能做到这一点..?我想要一些使用 StructType 的解决方案。

最佳答案

您只需将转换为预期的架构即可。

如果值采用 JSON 格式,您可以使用 from_json 之一标准功能:

from_json(e: Column, schema: Column): Column

对于其他格式,您必须应用转换(带或不带 UDF)来进行转换。

关于java - 如何将 Kafka 数据源中的值转换为给定的模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57163516/

相关文章:

apache-spark - 了解 spark.yarn.executor.memoryOverhead

scala - Spark未使用所有已配置的内存

sql - 如何在 Spark SQL 中为posexplode 列指定别名?

tsql - Pyspark 中的多个 WHEN 条件实现

java - Hibernate:避免集合的隐式初始化

c# - 正则表达式在 C# 中匹配,但在 java 中不匹配

java - Spark 3.1.2 NoSuchMethodError : org. apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.toAggregateExpression$default$2()Lscala/Option;

scala - 如何在 Spark SQL 中定义和使用用户定义的聚合函数?

java - 如何防止Spring使用测试中的@Configuration?

java - 有谁知道如何实现类似于Google App Engine 的白名单类访问方法?