scala - 将 Spark-kafka InputDStream 转换为 Array[Bytes]

标签 scala apache-spark apache-kafka spark-streaming avro

我正在使用 scala 并使用以下 Spark Streaming 方法从 Kafka 消费数据:

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

上面的变量返回 InputDStream,通过它我可以使用下面的代码查看原始/二进制格式的数据: println(行)

但我需要在原始/二进制格式上应用 avro 格式(可用模式),以便以预期的 json 格式查看数据。为了应用 avro 格式,我需要将上面的 InputDStream 转换为 avro 使用的 Array[Bytes]。

有人可以告诉我将 InputDStream 转换为 Array[Bytes] 吗?

或者

如果您知道在 InputDStream(of spark Streaming)上应用 avro 模式的更好方法,请分享。

最佳答案

您需要做两件事。第一种是为 Kafka 使用 DefaultDecoder,它为您提供一个 Array[Byte] 作为值类型:

val lines: DStream[(String, Array[Byte])] = 
  KafkaUtils
   .createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)

然后您需要通过额外的 map 应用您的 Avro 反序列化逻辑:

lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }

avroDeserializer 是您的任意类,它知道如何从 Avro 字节创建您的类型。

我个人使用avro4s通过宏获取案例类反序列化。

关于scala - 将 Spark-kafka InputDStream 转换为 Array[Bytes],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42213761/

相关文章:

apache-kafka - 在 Windows 10 中启动 kafka 服务器时出错

apache-kafka - 在 Kafka 中读取字段 'topic_metadata' 时出错

scala - Sbt:为什么它需要 Scala-lang 2.10.3?

apache-spark - Spark连续处理模式不读取所有kafka主题分区

scala - 我应该使用哪个适用于 Spark 2.0 的 HBase 连接器?

apache-spark - 如何为 Kafka Connect 和 Spark 注册和使用 AVRO Schema?

java - kafka自定义消费者读取传入记录

Scala:Hello World脚本不起作用

scala - 链接路径相关类型并在 Scala 中具有不同参数列表时实例化它们

scala - 如何通过.map在另一个RDD中传递一个RDD