我正在使用 scala 并使用以下 Spark Streaming 方法从 Kafka 消费数据:
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
上面的变量返回 InputDStream,通过它我可以使用下面的代码查看原始/二进制格式的数据: println(行)
但我需要在原始/二进制格式上应用 avro 格式(可用模式),以便以预期的 json 格式查看数据。为了应用 avro 格式,我需要将上面的 InputDStream 转换为 avro 使用的 Array[Bytes]。
有人可以告诉我将 InputDStream 转换为 Array[Bytes] 吗?
或者
如果您知道在 InputDStream(of spark Streaming)上应用 avro 模式的更好方法,请分享。
最佳答案
您需要做两件事。第一种是为 Kafka 使用 DefaultDecoder
,它为您提供一个 Array[Byte]
作为值类型:
val lines: DStream[(String, Array[Byte])] =
KafkaUtils
.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)
然后您需要通过额外的 map
应用您的 Avro 反序列化逻辑:
lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }
avroDeserializer
是您的任意类,它知道如何从 Avro 字节创建您的类型。
我个人使用avro4s通过宏获取案例类反序列化。
关于scala - 将 Spark-kafka InputDStream 转换为 Array[Bytes],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42213761/