java - 列出当前组配置并更新配置

标签 java apache-kafka

我们正在使用 kafka 来实现驱动事件应用程序,并在组中的消费者之间进行大量的重新平衡。

我们每次轮询最多 100 个事件,事件处理需要 2-10 分钟。 我们为每条消息保留 TTL,一段时间后,大多数消息都会过期(消息被消耗需要超过 1.5 小时) 目前,我们在一小时内有大约 10000 条关于该主题的消息和 3 个消费者。 我们看到的行为是,虽然一小时内生成了 10000 条消息,但我们在这段时间内消费了 25000 条消息,但同一条消息被多个消费者消费。 我们正在使用默认的提交策略。

我们得到了很多:

失败:提交无法完成,因为组已重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着 poll 循环花费了太多时间处理消息。您可以通过增加 max.poll.interval.ms 或使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。

我发现:

CommitFailedException Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member

我想尝试一下,但我不知道如何在代理中配置 group.max.session.timeout.ms 。

我还发现:

Why can't I increase session.timeout.ms?

如何获取此问题中所述的 ConsumerConfig 内容?

谢谢, 艾隆

最佳答案

我不确定您启动经纪商和客户端的方式,但希望您使用 kafka 的 bin 文件夹中的脚本通过控制台启动它们,这些是您应该遵循的步骤:

  • 经纪商方

您应该增加group.max.session.timeout.ms参数,例如,增加到双倍(??)。此参数定义,如 latest version of the documentation 中所述。 , 注册消费者允许的最大 session 超时。较长的超时使消费者有更多的时间在心跳之间处理消息,但代价是检测故障的时间更长。当您启动 Kafka 代理时,最简单的命令如下:

bin/kafka-server-start.sh config/server.properties

您必须通过添加参数来修改该 config/server.properties 文件。仅作为示例(最好将其定位到更下方..):

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker. 
broker.id=0

###group session timeout! yep, this one
group.max.session.timeout.ms=3600000  //(default is 1800000)

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from

...

重新启动 Kafka,代理端就完成了。

  • 消费者端

正如您所知,现在是时候修改客户端的 session.timeout.ms 参数了。此参数定义(这是一个很长的参数):

使用 Kafka 的组管理工具时用于检测消费者故障的超时。消费者定期发送心跳以向代理表明其活跃度。如果在此 session 超时到期之前代理没有收到心跳,则代理将从组中删除该消费者并启动重新平衡。请注意,该值必须在代理配置中 group.min.session.timeout.ms 和 group.max.session.timeout.ms 配置的允许范围内。

当您启动消费者时,最简单的命令如下:

bin/kafka-console-consumer.sh --consumer.config config/myconsumer.properties

在您的消费者属性文件中,您应该添加/修改参数。例如:

(...)
##Consumer session timeout! 
session.timeout.ms=20000 //(default is 10000)
(...)

通常,Consumer API 由客户端启动( java,...),并通过读取程序的启动参数来加载属性。重新启动消费者即可完成。

也许偏离主题,但在 kafka 的配置调整之外,您还可以检查是否有可能解耦消费和处理(例如,通过使用某种资源轮询)。

希望对你有帮助!

关于java - 列出当前组配置并更新配置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57316433/

相关文章:

docker - 在自定义网络中访问Docker容器的IP地址

elasticsearch - 从kafka导入数据到Elasticsearch时如何获取导入进度和错误日志?

apache-kafka - Kafka - 日志压缩行为

java - 使用 spring 中的 Rest 模板通过 post 调用发送 json 数据

java - Spring boot资源文件夹中访问xml的问题

java - jackson @JsonIgnore 属性为 null

elasticsearch - 使用 Kafka 作为 Filebeats 和 Logstash 的替代品

apache-kafka - Kafka 代理在清理日志文件时关闭

java - 管理多帧应用程序

java - Android Studio Button Clock 崩溃程序