apache-spark - 如何将字节从 Kafka 转换为原始对象?

标签 apache-spark apache-kafka spark-streaming spark-avro

我正在从 Kafka 获取数据,然后反序列化 Array[Byte]使用默认解码器,然后我的 RDD 元素看起来像 (null,[B@406fa9b2) , (null,[B@21a9fe0)但我想要我的原始数据有一个架构,那么我该如何实现呢?

我以 Avro 格式序列化消息。

最佳答案

您必须使用适当的解串器对字节进行解码,比如字符串或您的自定义对象。

如果你不做解码,你会得到 [B@406fa9b2这只是 Java 中字节数组的文本表示。

Kafka 对消息的内容一无所知,因此它将字节数组从生产者传递给消费者。

在 Spark Streaming 中,您必须对键和值使用序列化程序(引用 KafkaWordCount example):

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")

使用上述序列化程序,您将获得 DStream[String]所以你和 RDD[String] 一起工作.

但是,如果您想直接将字节数组反序列化为自定义类,则必须编写自定义 Serializer (这是 Kafka 特有的,与 Spark 无关)。

我建议使用具有固定架构或 Avro 的 JSON(使用 Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages 中描述的解决方案)。

Structured Streaming但是管道可能如下所示:

val fromKafka = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  select('value cast "string") // <-- conversion here

关于apache-spark - 如何将字节从 Kafka 转换为原始对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44283099/

相关文章:

mysql - 如何将 Apache Spark 与 MySQL 集成以将数据库表作为 spark 数据框读取?

hadoop - 当 rolloverSize 设置为 150 MB 时,每隔几秒就会刷新一次 Flume 消息

apache-spark - 当我们在 Apache Spark 中使用时,找不到 Set([TOPIC NAME,0])) 的领导者

apache-spark - Spark 流式传输示例对我不起作用 : Network word count (maybe Data not getting streamed)

apache-spark - 如何在 Spark Kafka 直接流中手动提交偏移量?

python - 如何在集群上保存文件

python Spark 替代非常大的数据爆炸

java - 将 Kafka 输入流动态连接到多个输出流

azure - 无法连接到在 Azure 应用服务上运行的 KAFKA

multithreading - java.util.ConcurrentModificationException : KafkaConsumer is not safe for multi-threaded access