kafka-consumer-api - 当我们的 Kafka 分区存在滞后时,Akka Kafka Consumer 处理率急剧下降

标签 kafka-consumer-api partitioning akka-stream throughput akka-kafka

我们面临着这样一种情况:每当出现延迟时,我们的 akka-stream-kaka-consumer 处理率就会下降。当我们在分区没有任何延迟的情况下启动它时,处理速度突然增加。

MSK 集群 - 10 个主题 - 每个主题 40 个分区 => 总共 400 个领导分区

为了在系统中实现高吞吐量和并行性,我们实现了 akka-stream-kafka 消费者分别订阅每个主题分区,从而在消费者和分区之间实现 1:1 映射。

这是消费者设置:

  1. ec2 服务实例数量 - 7
  2. 每项服务为 10 个主题中的每一个主题启动 6 个使用者,从而导致每个服务实例有 60 个使用者。
  3. 消费者总数 = 实例数量 (7) * 每个服务实例上的消费者数量 (60) = 420

因此,我们总共启动了 420 个分布在不同实例中的消费者。根据 RangeAssignor 分区策略(默认策略),每个分区将分配给不同的消费者,400 个消费者将使用 400 个分区,20 个消费者将保持未使用状态。我们已经验证了这个分配,看起来不错。

使用的实例类型: c5.xlarge

MSK 配置:

Apache Kafka 版本 - 2.4.1.1

经纪商总数 - 9(分布在 3 个可用区)

经纪商类型: kafka.m5.large

每个区域的经纪商: 3

auto.create.topics.enable=true

default.replication.factor=3

min.insync.replicas=2

num.io.threads=8

num.network.threads=5

分区数量=40

num.replica.fetchers=2

replica.lag.time.max.ms=30000

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

socket.send.buffer.bytes=102400

unclean.leader.election.enable=true

zookeeper.session.timeout.ms=18000

log.retention.ms=259200000

这是我们为每个消费者使用的配置

akka.kafka.consumer {
 kafka-clients {
  bootstrap.servers = "localhost:9092"
  client.id = "consumer1"
  group.id = "consumer1"
  auto.offset.reset="latest"
 }
 aws.glue.registry.name="Registry1"
 aws.glue.avroRecordType = "GENERIC_RECORD"
 aws.glue.region = "region"
 

    kafka.value.deserializer.class="com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer"

 # Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
 # configured by `consumer.metadata-request-timeout`
 connection-checker {

  #Flag to turn on connection checker
  enable = true

  # Amount of attempts to be performed after a first connection failure occurs
  # Required, non-negative integer
  max-retries = 3

  # Interval for the connection check. Used as the base for exponential retry.
  check-interval = 15s

  # Check interval multiplier for backoff interval
  # Required, positive number
  backoff-factor = 2.0
 }
}

akka.kafka.committer {

 # Maximum number of messages in a single commit batch
 max-batch = 10000

 # Maximum interval between commits
 max-interval = 5s

 # Parallelism for async committing
 parallelism = 1500

 # API may change.
 # Delivery of commits to the internal actor
 # WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
 # SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since 1.1)
 delivery = WaitForAck

 # API may change.
 # Controls when a `Committable` message is queued to be committed.
 # OffsetFirstObserved: When the offset of a message has been successfully produced.
 # NextOffsetObserved: When the next offset is observed.
 when = OffsetFirstObserved
}


akka.http {
 client {
  idle-timeout = 10s
 }
 host-connection-pool {
  idle-timeout = 10s
  client {
   idle-timeout = 10s
  }
 }
}

consumer.parallelism=1500

我们使用下面的代码来实现从 Kafka 到空接收器的流

override implicit val actorSystem = ActorSystem("Consumer1")
override implicit val materializer = ActorMaterializer()
override implicit val ec = system.dispatcher
val topicsName = "Set of Topic Names"
val parallelism = conf.getInt("consumer.parallelism")


val supervisionDecider: Supervision.Decider = {
 case _ => Supervision.Resume
}

val commiter = committerSettings.getOrElse(CommitterSettings(actorSystem))
val supervisionStrategy = ActorAttributes.supervisionStrategy(supervisionDecider)
Consumer
 .committableSource(consumerSettings, Subscriptions.topics(topicsName))
 .mapAsync(parallelism) {
  msg =>
   f(msg.record.key(), msg.record.value())
    .map(_ => msg.committableOffset)
    .recoverWith {
     case _ => Future.successful(msg.committableOffset)
    }
 }
 .toMat(Committer.sink(commiter).withAttributes(supervisionStrategy))(DrainingControl.apply)
 .withAttributes(supervisionStrategy)

代码中的库版本

"com.typesafe.akka" %% "akka-http"            % "10.1.11",
 "com.typesafe.akka" %% "akka-stream-kafka" % "2.0.3",
 "com.typesafe.akka" %% "akka-stream" % "2.5.30"

观察结果如下,

  1. 假设在连续 1 小时的时间间隔内,只有部分消费者
    正在积极消耗延迟并以预期的速度进行处理。
  2. 在接下来的 1 小时内,其他一些消费者变得活跃且活跃
    从其分区中消耗,然后停止处理。
  3. 从 offsetLag 图表中观察到,所有滞后都在一次内被清除。

我们希望所有消费者并行运行并实时处理消息。三天的处理延迟给我们造成了严重的停机时间。我尝试按照给定的链接进行操作,但我们已经使用了固定版本 https://github.com/akka/alpakka-kafka/issues/549

任何人都可以帮助我们在消费者配置或其他问题方面缺少什么。

Graph of Offset Lag Per Partition Per Topic

最佳答案

在我看来,该滞后图表明您的整个系统无法处理所有负载,而且几乎看起来一次只有一个分区实际上正在取得进展。

这种现象向我表明,f 中进行的处理最终控制了清除某些队列的速率,并且 mapAsync 中的并行性舞台太高,有效地使分区相互竞争。由于Kafka消费者会批量记录(默认情况下以500为批处理,假设消费者的滞后超过500条记录),如果并行度高于此值,那么所有这些记录基本上会作为一个 block 同时进入队列。看起来mapAsync中的并行度是1500;考虑到 Kafka 默认 500 批处理大小的明显使用,这似乎太高了:没有理由让它大于 Kafka 批处理大小,如果您希望分区之间的消耗率更均匀,那么它应该小很多比批量大小。

如果没有 f 中发生的情况的详细信息,很难说该队列是什么以及应该减少多少并行度。但我可以分享一些一般准则:

  • 如果工作受 CPU 限制(这种情况的标志是您的使用者的 CPU 利用率非常高),则您有 7 个使用者,每个使用者有 4 个 vCPU。您无法一次物理处理超过 28 (7 x 4) 条记录,因此 mapAsync 中的并行度不应超过 1;或者您需要更多和/或更大的实例
  • 如果工作是 I/O 绑定(bind)的或以其他方式阻塞,我会小心工作正在哪个线程池/执行上下文/Akka 调度程序上完成。所有这些通常只会产生有限数量的线程,并在所有线程都忙时维护一个工作队列;该工作队列很可能是感兴趣的队列。扩大该池中的线程数量(或者如果使用默认执行上下文或默认 Akka 调度程序,将该工作负载移动到适当大小的池中)将减少队列的压力
  • 由于您包含了 akka-http,因此 f 中的消息处理可能涉及向某些其他服务发送 HTTP 请求。在这种情况下,请务必记住 Akka HTTP 为每个目标主机维护一个队列;目标端也可能有一个队列来控制那里的吞吐量。这在某种程度上是第二种(I/O 限制)情况的特殊情况。

实例上的 CPU 利用率非常低,可以证明 I/O 限制/阻塞情况。如果您要填充每个目标主机的队列,您将看到有关“超出配置的最大打开请求值”的日志消息。

另一件事值得注意的是,由于 Kafka 消费者本质上是阻塞的,Alpakka Kafka 消费者 Actor 在自己的调度程序中运行,其大小默认为 16,这意味着每个主机最多只能有 16 个消费者或生产者可以工作一次。将 akka.kafka.default-dispatcher.thread-pool-executor.fixed-pool-size 设置为您的应用启动的消费者数量(每 7 个主题配置 6 个消费者中的 42 个)也许是个好主意。 Alpakka Kafka 调度程序中的线程饥饿可能会导致消费者重新平衡,从而扰乱消费。

在不进行任何其他更改的情况下,我建议,为了跨分区的消耗率更均匀,设置

akka.kafka.default-dispatcher.thread-pool-executor.fixed-pool-size = 42
consumer.parallelism = 50

关于kafka-consumer-api - 当我们的 Kafka 分区存在滞后时,Akka Kafka Consumer 处理率急剧下降,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69750636/

相关文章:

apache-kafka - Kafka 控制台消费者错误 "Offset commit failed on partition"

mySQL通过MD5对表进行分区?

scala - 将 SSE 与 Redis 发布/订阅和 Akka Streams 一起使用的最简单方法是什么?

sql-server - 如何在 SQL Server 中结合水平和垂直分区

apache-spark - 为什么在重新分区 Spark 数据帧时会得到这么多空分区?

scala - Akka-http 在一个流中处理来自不同连接的 HttpRequests

websocket - Kafka 消息到 websocket

java - 在Java中获取Kafka未使用的消息数

apache-kafka - 无法在 Ubuntu 16.04 上安装 rdkafka

java - 当卡夫卡宕机时,卡夫卡消费者挂起投票