我们面临着这样一种情况:每当出现延迟时,我们的 akka-stream-kaka-consumer 处理率就会下降。当我们在分区没有任何延迟的情况下启动它时,处理速度突然增加。
MSK 集群 - 10 个主题 - 每个主题 40 个分区 => 总共 400 个领导分区
为了在系统中实现高吞吐量和并行性,我们实现了 akka-stream-kafka 消费者分别订阅每个主题分区,从而在消费者和分区之间实现 1:1 映射。
这是消费者设置:
- ec2 服务实例数量 - 7
- 每项服务为 10 个主题中的每一个主题启动 6 个使用者,从而导致每个服务实例有 60 个使用者。
- 消费者总数 = 实例数量 (7) * 每个服务实例上的消费者数量 (60) = 420
因此,我们总共启动了 420 个分布在不同实例中的消费者。根据 RangeAssignor 分区策略(默认策略),每个分区将分配给不同的消费者,400 个消费者将使用 400 个分区,20 个消费者将保持未使用状态。我们已经验证了这个分配,看起来不错。
使用的实例类型: c5.xlarge
MSK 配置:
Apache Kafka 版本 - 2.4.1.1
经纪商总数 - 9(分布在 3 个可用区)
经纪商类型: kafka.m5.large
每个区域的经纪商: 3
auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
分区数量=40
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
log.retention.ms=259200000
这是我们为每个消费者使用的配置
akka.kafka.consumer {
kafka-clients {
bootstrap.servers = "localhost:9092"
client.id = "consumer1"
group.id = "consumer1"
auto.offset.reset="latest"
}
aws.glue.registry.name="Registry1"
aws.glue.avroRecordType = "GENERIC_RECORD"
aws.glue.region = "region"
kafka.value.deserializer.class="com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer"
# Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
# configured by `consumer.metadata-request-timeout`
connection-checker {
#Flag to turn on connection checker
enable = true
# Amount of attempts to be performed after a first connection failure occurs
# Required, non-negative integer
max-retries = 3
# Interval for the connection check. Used as the base for exponential retry.
check-interval = 15s
# Check interval multiplier for backoff interval
# Required, positive number
backoff-factor = 2.0
}
}
akka.kafka.committer {
# Maximum number of messages in a single commit batch
max-batch = 10000
# Maximum interval between commits
max-interval = 5s
# Parallelism for async committing
parallelism = 1500
# API may change.
# Delivery of commits to the internal actor
# WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
# SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since 1.1)
delivery = WaitForAck
# API may change.
# Controls when a `Committable` message is queued to be committed.
# OffsetFirstObserved: When the offset of a message has been successfully produced.
# NextOffsetObserved: When the next offset is observed.
when = OffsetFirstObserved
}
akka.http {
client {
idle-timeout = 10s
}
host-connection-pool {
idle-timeout = 10s
client {
idle-timeout = 10s
}
}
}
consumer.parallelism=1500
我们使用下面的代码来实现从 Kafka 到空接收器的流
override implicit val actorSystem = ActorSystem("Consumer1")
override implicit val materializer = ActorMaterializer()
override implicit val ec = system.dispatcher
val topicsName = "Set of Topic Names"
val parallelism = conf.getInt("consumer.parallelism")
val supervisionDecider: Supervision.Decider = {
case _ => Supervision.Resume
}
val commiter = committerSettings.getOrElse(CommitterSettings(actorSystem))
val supervisionStrategy = ActorAttributes.supervisionStrategy(supervisionDecider)
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topicsName))
.mapAsync(parallelism) {
msg =>
f(msg.record.key(), msg.record.value())
.map(_ => msg.committableOffset)
.recoverWith {
case _ => Future.successful(msg.committableOffset)
}
}
.toMat(Committer.sink(commiter).withAttributes(supervisionStrategy))(DrainingControl.apply)
.withAttributes(supervisionStrategy)
代码中的库版本
"com.typesafe.akka" %% "akka-http" % "10.1.11",
"com.typesafe.akka" %% "akka-stream-kafka" % "2.0.3",
"com.typesafe.akka" %% "akka-stream" % "2.5.30"
观察结果如下,
- 假设在连续 1 小时的时间间隔内,只有部分消费者
正在积极消耗延迟并以预期的速度进行处理。 - 在接下来的 1 小时内,其他一些消费者变得活跃且活跃
从其分区中消耗,然后停止处理。 - 从 offsetLag 图表中观察到,所有滞后都在一次内被清除。
我们希望所有消费者并行运行并实时处理消息。三天的处理延迟给我们造成了严重的停机时间。我尝试按照给定的链接进行操作,但我们已经使用了固定版本 https://github.com/akka/alpakka-kafka/issues/549
任何人都可以帮助我们在消费者配置或其他问题方面缺少什么。
最佳答案
在我看来,该滞后图表明您的整个系统无法处理所有负载,而且几乎看起来一次只有一个分区实际上正在取得进展。
这种现象向我表明,f
中进行的处理最终控制了清除某些队列的速率,并且 mapAsync
中的并行性舞台太高,有效地使分区相互竞争。由于Kafka消费者会批量记录(默认情况下以500为批处理,假设消费者的滞后超过500条记录),如果并行度高于此值,那么所有这些记录基本上会作为一个 block 同时进入队列。看起来mapAsync
中的并行度是1500;考虑到 Kafka 默认 500 批处理大小的明显使用,这似乎太高了:没有理由让它大于 Kafka 批处理大小,如果您希望分区之间的消耗率更均匀,那么它应该小很多比批量大小。
如果没有 f
中发生的情况的详细信息,很难说该队列是什么以及应该减少多少并行度。但我可以分享一些一般准则:
- 如果工作受 CPU 限制(这种情况的标志是您的使用者的 CPU 利用率非常高),则您有 7 个使用者,每个使用者有 4 个 vCPU。您无法一次物理处理超过 28 (7 x 4) 条记录,因此 mapAsync 中的并行度不应超过 1;或者您需要更多和/或更大的实例
- 如果工作是 I/O 绑定(bind)的或以其他方式阻塞,我会小心工作正在哪个线程池/执行上下文/Akka 调度程序上完成。所有这些通常只会产生有限数量的线程,并在所有线程都忙时维护一个工作队列;该工作队列很可能是感兴趣的队列。扩大该池中的线程数量(或者如果使用默认执行上下文或默认 Akka 调度程序,将该工作负载移动到适当大小的池中)将减少队列的压力
- 由于您包含了
akka-http
,因此f
中的消息处理可能涉及向某些其他服务发送 HTTP 请求。在这种情况下,请务必记住 Akka HTTP 为每个目标主机维护一个队列;目标端也可能有一个队列来控制那里的吞吐量。这在某种程度上是第二种(I/O 限制)情况的特殊情况。
实例上的 CPU 利用率非常低,可以证明 I/O 限制/阻塞情况。如果您要填充每个目标主机的队列,您将看到有关“超出配置的最大打开请求值”的日志消息。
另一件事值得注意的是,由于 Kafka 消费者本质上是阻塞的,Alpakka Kafka 消费者 Actor 在自己的调度程序中运行,其大小默认为 16,这意味着每个主机最多只能有 16 个消费者或生产者可以工作一次。将 akka.kafka.default-dispatcher.thread-pool-executor.fixed-pool-size
设置为您的应用启动的消费者数量(每 7 个主题配置 6 个消费者中的 42 个)也许是个好主意。 Alpakka Kafka 调度程序中的线程饥饿可能会导致消费者重新平衡,从而扰乱消费。
在不进行任何其他更改的情况下,我建议,为了跨分区的消耗率更均匀,设置
akka.kafka.default-dispatcher.thread-pool-executor.fixed-pool-size = 42
consumer.parallelism = 50
关于kafka-consumer-api - 当我们的 Kafka 分区存在滞后时,Akka Kafka Consumer 处理率急剧下降,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69750636/