java - 为什么 Kafka Direct Stream 会为每条消息创建一个新的解码器?

标签 java apache-spark apache-kafka spark-streaming kryo

我有一个用 Java 编写并使用 Spark 2.1 的 Spark 流应用程序。我正在使用 KafkaUtils.createDirectStream 来读取来自 Kafka 的消息。我正在为 kafka 消息使用 kryo 编码器/解码器。我在 Kafka properties-> key.deserializer, value.deserializer, key.serializer, value.deserializer

中指定了这个 当 Spark 在微批中拉取消息时,使用 kryo 解码器成功解码消息。但是我注意到 Spark 执行程序创建了一个新的 kryo 解码器实例,用于解码从 kafka 读取的每条消息。我通过将日志放入解码器构造函数来检查这一点

这对我来说似乎很奇怪。难道不应该为每条消息和每批使用相同的解码器实例吗?

我从 kafka 读取的代码:

JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams));

JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> {
    return new Tuple2<String, Class1>(consRecord.key(), consRecord.value());
});

最佳答案

如果我们想了解 Spark 如何在内部从 Kafka 获取数据,我们需要查看 KafkaRDD.compute,这是为每个 RDD 实现的方法告诉框架如何计算 RDD:

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
  val part = thePart.asInstanceOf[KafkaRDDPartition]
  assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
  if (part.fromOffset == part.untilOffset) {
    logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
    s"skipping ${part.topic} ${part.partition}")
    Iterator.empty
  } else {
    new KafkaRDDIterator(part, context)
  }
}

这里重要的是 else 子句,它创建了一个 KafkaRDDIterator。这在内部有:

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
  .newInstance(kc.config.props)
  .asInstanceOf[Decoder[K]]

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
  .newInstance(kc.config.props)
  .asInstanceOf[Decoder[V]]

如您所见,它通过反射为每个底层分区 创建了键解码器和值解码器的实例。这意味着它不是每条消息生成的,而是每个 Kafka 分区生成的。

为什么要这样实现?我不知道。我假设是因为与 Spark 内部发生的所有其他分配相比,键和值解码器的性能影响应该可以忽略不计。

如果您分析了您的应用并发现这是一个分配热路径,您可以提出一个问题。否则,我不会担心。

关于java - 为什么 Kafka Direct Stream 会为每条消息创建一个新的解码器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47827902/

相关文章:

scala - KafkaIO 检查点 - 如何向 Kafka 提交偏移量

java - 使用 JFrame 调用 JComboBox

java - 如何将上面目录中的文件包含在 Maven src.xml 中?

java - 如何在调用 IOUtils.copy 后重新读取 InputStream?

java - 向mysql插入数据后,Spark Java中显示的是IterableWrapper(非空迭代器)而不是String

java - 如何在 Kafka 0.11 中正确提交生产者并消费事务消息?

java - Jackson:我怎样才能忽略 getter 而解析 setter?

scala - 具有 SASL_SSL 身份验证的 Kafka Spark 结构化流

apache-spark - 更改数据框 pyspark 中的列值

apache-spark - 如何在使用 Spark Streaming 流式传输 kafka 时对消息进行重复数据删除?