apache-spark - 从 Kafka 主题反序列化 Spark 结构化流数据

标签 apache-spark apache-kafka spark-streaming-kafka

我正在使用 Kafka 2.3.0 和 Spark 2.3.4。我已经构建了一个 Kafka 连接器,它读取 CSV 文件并将一行从 CSV 发布到相关的 Kafka 主题。该行是这样的: “201310,XYZ001,Sup,XYZ,A,0,预售,6,标注,0,0,1,N,前景”。 CSV 包含数千行这样的行。连接器能够成功地将它们发布到主题上,我也能够在 Spark 中获取消息。我不确定如何将该消息反序列化到我的架构中?请注意,消息是 headless 的,因此 kafka 消息中的关键部分为空。值部分包括上面的完整 CSV 字符串。我的代码如下。

我看了这个 - How to deserialize records from Kafka using Structured Streaming in Java?但无法将其移植到我的 csv 案例中。此外,我尝试了其他 Spark sql 机制来尝试从“值”列中检索单个行,但无济于事。如果我确实设法获得编译版本(例如,indivValues 数据集或 dsRawData 上的映射),我会收到类似于以下内容的错误:“org.apache.spark.sql.AnalysisException:无法解析给定的 'IC'输入列:[值];”。如果我理解正确的话,那是因为 value 是一个逗号分隔的字符串,而如果我不做“某事”,spark 并不会真正神奇地为我映射它。

//build the spark session
SparkSession sparkSession = SparkSession.builder()
    .appName(seCfg.arg0AppName)
    .config("spark.cassandra.connection.host",config.arg2CassandraIp)
    .getOrCreate();

...
//my target schema is this:
StructType schema = DataTypes.createStructType(new StructField[] {
    DataTypes.createStructField("timeOfOrigin",  DataTypes.TimestampType, true),
    DataTypes.createStructField("cName", DataTypes.StringType, true),
    DataTypes.createStructField("cRole", DataTypes.StringType, true),
    DataTypes.createStructField("bName", DataTypes.StringType, true),
    DataTypes.createStructField("stage", DataTypes.StringType, true),
    DataTypes.createStructField("intId", DataTypes.IntegerType, true),
    DataTypes.createStructField("intName", DataTypes.StringType, true),
    DataTypes.createStructField("intCatId", DataTypes.IntegerType, true),
    DataTypes.createStructField("catName", DataTypes.StringType, true),
    DataTypes.createStructField("are_vval", DataTypes.IntegerType, true),
    DataTypes.createStructField("isee_vval", DataTypes.IntegerType, true),
    DataTypes.createStructField("opCode", DataTypes.IntegerType, true),
    DataTypes.createStructField("opType", DataTypes.StringType, true),
    DataTypes.createStructField("opName", DataTypes.StringType, true)
    });
...

 Dataset<Row> dsRawData = sparkSession
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", config.arg3Kafkabootstrapurl)
    .option("subscribe", config.arg1TopicName)
    .option("failOnDataLoss", "false")
    .load();

//getting individual terms like '201310', 'XYZ001'.. from "values"
Dataset<String> indivValues = dsRawData
    .selectExpr("CAST(value AS STRING)")
    .as(Encoders.STRING())
    .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(",")).iterator(), Encoders.STRING());

//indivValues when printed to console looks like below which confirms that //I receive the data correctly and completely
/*
When printed on console, looks like this:
                +--------------------+
                |               value|
                +--------------------+
                |              201310|
                |              XYZ001|
                |                 Sup|
                |                 XYZ|
                |                   A|
                |                   0|
                |            Presales|
                |                   6|
                |             Callout|
                |                   0|
                |                   0|
                |                   1|
                |                   N|
                |            Prospect|
                +--------------------+
*/

StreamingQuery sq = indivValues.writeStream()
    .outputMode("append")
    .format("console")
    .start();
//await termination
sq.awaitTermination();
  • 我要求将数据输入为上面所示的自定义架构,因为我将对其运行数学计算(对于每个新行与一些旧行的组合)。
  • 在将 header 推送到主题之前,在 Kafka 连接器源任务中合成 header 是否更好?拥有标题会让这个问题的解决变得更简单吗?

谢谢!

最佳答案

根据您现有的代码,解析 dsRawData 中的输入的最简单方法就是将其转换为Dataset<String>然后使用 native csv reader api

//dsRawData has raw incoming data from Kafka...
Dataset<String> indivValues = dsRawData
                .selectExpr("CAST(value AS STRING)")
                .as(Encoders.STRING());

Dataset<Row>    finalValues = sparkSession.read()
                .schema(schema)
                .option("delimiter",",")
                .csv(indivValues);

通过这样的构造,您可以使用与直接从 Spark 读取 CSV 文件时可用的完全相同的 CSV 解析选项。

关于apache-spark - 从 Kafka 主题反序列化 Spark 结构化流数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57057159/

相关文章:

java - Spark Streaming Kafka 消费者

java - 我在使用 readStream().format ("kafka"时遇到问题)

apache-spark - Spark 错误地将整数转换为 `struct<int:int,long:bigint>`

apache-spark - spark 有效地找到一组列的最频繁值

java - org.apache.kafka.streams.examples.wordcount.WordCountDemo 不会终止

java - Spring Kafka 类不在可信包中

scala - 根据对 Spark 中 PCA 的贡献选择最重要的变量

java - Spark 提交中不支持的类版本错误

java - Kafka 宕机时如何处理 IOException?

apache-spark - 如何优化 spark structured streaming app 中执行器实例的数量?