apache-kafka - Spark Streaming - 是否可以使用 Kafka 主题的特定分区?

标签 apache-kafka spark-streaming

我正在尝试使用 Spark Streaming 使用 Kafka 主题的特定分区。

我在 KafkaUtils 中没有看到此用例的任何方法类(class)。

有一个方法叫createRDD ,这基本上是在期待 offsets它仅对非流式应用程序有用。有没有其他方法可以使用 Spark Streaming 使用 Kafka 主题的特定分区?

最佳答案

没有办法使用单个分区,我们可以使用的最细粒度的是一个主题。但是,有一种方法可以指定给定消息源自特定分区。您可以在使用 createDirectStream 的重载时执行此操作这需要一个 Function1[MessageAndMetadata, R] .

例如,假设我们有一个类型为 String 的 key 和消息。 ,而且我们目前只从一个主题中消费。我们可以做的:

val topicAndPartition: Map[TopicAndPartition, Long] = ???
val kafkaProperties: Map[String, String] = ???

KafkaUtils.createDirectStream[String,
                              String, 
                              StringDecoder,
                              StringDecoder,
                              (String, String)](
        streamingContext,
        kafkaConfig.properties,
        topicAndPartition,
        (mam: MessageAndMetadata[String, String]) =>
          (mam.partition, mam.message())

这样,我输出了分区 (1) 和底层消息 (2) 的元组。然后,我可以过滤这个 DStream[(String, String)]只包含来自特定分区的消息:
val filteredStream = kafkaDStream.filter { case (partition, _) => partition == 4 }

如果我们从多个主题中消费,我们需要输出一个主题和分区的元组,以便用正确的主题过滤分区。幸运的是,已经有一个方便的案例类叫做 TopicAndPartition我们可以用。我们会有:
(mam: MessageAndMetadata[String, String]) => 
  (TopicAndPartition(mam.topic(), mam.partition()), mam.message())

进而:
val filteredStream = kafkaDStream.filter { 
   case (tap, _) => tap.topic == "mytopic" && tap.partition == 4 
}

关于apache-kafka - Spark Streaming - 是否可以使用 Kafka 主题的特定分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40025161/

相关文章:

scala - 来自多个远程主机的网络Spark流

amazon-web-services - Amazon EMR 和 Spark 流

apache-spark - 如何使用模式推断将 RDD[String] 写入 Parquet 文件?

python - 使用 kafka 0.8.2.0 跟踪主题大小和消费者滞后

python - 将消息从容器发布到在容器外部运行的kafka

java - 来自 kafka 消费者的 InstanceAlreadyExistsException

apache-spark - 在 PySpark Structured Streaming 中对多个输出流使用单个流式 DataFrame

cassandra - spark-cassandra-connector 性能 : executors seem to be idle

apache-spark - 如何将数据从 Kafka 传递到 Spark Streaming?

apache-kafka - Kafka - 为什么没有选出新的主题分区领导者?