scala - Spark Structured Streaming with Kafka - 如何重新分区数据并在工作节点之间分配处理

标签 scala apache-spark apache-kafka spark-structured-streaming spark-kafka-integration

如果我的 Kafka 主题收到类似的记录

CHANNEL | VIEWERS | .....
ABC     |  100    | .....
CBS     |  200    | .....

我有 Spark 结构化流代码来读取和处理 Kafka 记录,如下所示:
val spark = SparkSession 
      .builder 
      .appName("TestPartition") 
      .master("local[*]") 
      .getOrCreate() 

    import spark.implicits._ 

    val dataFrame = spark 
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", 
      "1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092") 
      .option("subscribe", "partition_test") 
      .option("failOnDataLoss", "false") 
      .load() 
      .selectExpr("CAST(value AS STRING)") 
      // I will use a custom UDF to transform to a specific object

目前,我使用 foreachwriter 处理记录如下:
val writer = new ForeachWriter[testRec] {
    def open(partitionId: Long, version: Long): Boolean = {
      true
    }
    def process(record: testRec) = {
      handle(record)
    }
    def close(errorOrNull: Throwable): Unit = {
    }
  }

  val query = dataFrame.writeStream
    .format("console")
    .foreach(writer)
    .outputMode("append")
    .start()

代码工作得很好。但是,我想做的是按 channel 对传入数据进行分区,以便每个工作人员负责特定的 channel ,并且我在 handle() 块内进行与该 channel 相关的内存计算。那可能吗 ?如果是,我该怎么做?

最佳答案

该代码适用于 handle记录级别的方法,独立于记录的分区。
我看到两个选项可以确保同一 channel 的所有消息都将在同一个执行器上处理:

  • 如果您可以控制 KafkaProducer 将数据生成到主题“partition_test”中,您可以设置值 channel作为Kafka消息的key。默认情况下,KafkaProducer 使用 key 来定义数据写入的分区。这将确保具有相同键的所有消息都将落在同一个 Kafka 主题分区中。由于使用 Kafka 主题的 Spark Structured Streaming 作业将匹配 Kafka 分区,因此您的结果 dataFrame将具有与 Kafka 主题相同数量的分区,并且同一 channel 的所有消息都在同一分区中。
  • 正如评论中已经写的那样,您可以简单地重新分区您的 dataFrame基于列 channel 的值通过做dataFrame.repartition(n, col("columnName")) ,其中 n是分区数。这样,具有相同 channel 的所有记录都将落在同一个分区中,因此在同一个执行器上处理。

  • 两个重要的注意事项:
  • 获得分区(数据帧或 Kafka 主题)的所有权需要一些额外的关注,因为您最终可能会遇到所谓的“数据倾斜”。与只有几条消息的分区相比,当您有包含大量消息的分区时,就会发生数据倾斜。这将对您的整体表现产生负面影响。
  • 只要您使用的是 foreach输出接收器 无论如何,在记录级别处理数据时,您的数据如何分区都无关紧要。如果您正在寻找更多控制权,您可能宁愿使用 foreachBatch接收器(在 Spark 2.4+ 中可用)。 foreachBatch 输出接收器使您可以控制每个微批次的批次数据帧,您可以使用 foreachPartitions 执行基于分区的逻辑。或 mapPartitions .
  • 关于scala - Spark Structured Streaming with Kafka - 如何重新分区数据并在工作节点之间分配处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49951022/

    相关文章:

    docker - 无法使用Sarama Golang软件包创建Kafka生产者客户端-“客户端/元数据在获取元数据时从代理处出错:EOF”

    apache-spark - 带有 Kafka SASL/PLAIN 身份验证的 Spark 结构化流

    mysql - 光滑的动态分组

    python - 在 Pyspark 中拆分 RDD 分区中的数组

    scala - 如何附加到 Scala 中的文件?

    apache-spark - Spark-具有与RDD之后自动创建的文本文件同名的文件夹吗?

    apache-spark - Spark 是否可以设置默认存储级别?

    java - 分别运行具有不同主题的 2 个消费者时出现 Kafka CommitFailedException

    Scala mergeMsg 与 def

    scala - 有没有办法使用对象的类型作为类型参数的参数?