spring-integration - 使用 KafkaNativeOffsetManager 时 Spring Integration Kafka 消息处理速度慢

标签 spring-integration apache-kafka

我们一直在使用 SI Kafka 进行一个新项目,并取得了巨大成功。在最近的切换之前,我们使用 KafkaTopicOffsetManager 来管理消费者主题偏移量。为了不让每个消费者/主题对有额外的主题并使用 Burrow 或滞后监控,我们决定使用最新的 KafkaNativeOffsetManager,它使用 Kafka 提供的 native 偏移管理。但在进行切换后,我们注意到来自目标主题的消息的消耗持续滞后。我们知道 KafkaTopicOffsetManager 并没有发生这种情况,因为我们在切换之前已经使用了它几个月。我们还进行了并行测试,并验证了使用 KafkaTopicOffsetManager 时消息的消费与消息的生成几乎是实时的,而 KafkaNativeOffsetManager 总是越来越滞后。两个偏移管理器都使用默认配置,并且都在处理消息后提交偏移(自动确认)。

所以我真的有两个问题,第一个问题不是这篇文章的主要问题。

第一个问题是为什么 native 偏移量管理会比使用主题进行偏移量管理慢?

第二个问题是,我们是否可以将 SI kafka 配置为在成功处理每条消息时不提交偏移量,而是提供不同的策略?我们的想法是,也许我们不应该如此频繁地提交偏移量,而应该将它们作为批量更新进行。例如,在成功处理 25 条消息后或 30 秒后提交偏移量。

谢谢

最佳答案

当禁用自动提交并接收确认 header 时,您唯一需要做的就是在处理消息后调用 acknowledge() 。这假设即使您在不同的线程中处理消息,您也将保留对 Acknowledgment 实例的引用,无论是本身还是作为原始 Message 的一部分- 或者,如果您正在进行转换,则正在复制 header 。但调用需要由您的代码进行。

其次,性能问题 - 这是由于 KafkaNativeOffsetManager 实现对代理进行阻塞、相对更昂贵的调用(相对于简单地将消息发送到压缩主题而言)造成的,正如 KafkaTopicOffsetManager 所做的那样。一般来说,在每条消息之后进行更新的成本很高,在 Spring XD 中,我们通过使用 https://github.com/spring-projects/spring-xd/blob/master/extensions/spring-xd-extension-kafka/src/main/java/org/springframework/integration/x/kafka/WindowingOffsetManager.java 来减轻这种情况,这减少了有效写入的数量。我想我们可以做类似的事情用于 Spring 集成。

也就是说:相比之下,使用 KafkaNativeOffsetManager 需要 9.8 秒完成 100000 次更新,而使用 KafkaTopicOffsetManager 需要 0.382 秒,如 https://github.com/mbogoevici/spring-integration-kafka/blob/perftest/src/test/java/org/springframework/integration/kafka/performance/OffsetManagerPerformanceTests.java 所示。 (结果在我自己的机器上收集)。结果可能会有所偏差,但仍然表明存在很大差异。在 YourKit 中进行跟踪确认了结果。

关于spring-integration - 使用 KafkaNativeOffsetManager 时 Spring Integration Kafka 消息处理速度慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34538314/

相关文章:

apache-kafka - Kafka 流创建时间与日志附加时间

java - 如何在Spring Boot中调用具有相同主题的两个KafkaListener?

java - Spring集成多个客户端连接到服务器端口

tcp - 如何在不使用终止符或固定长度消息的情况下接收 TCP 数据包

java - 反序列化失败时的 Kafka 消费者行为

java - 如何针对故障点确保生产者和消费者应用程序之间的kafka事务同步?

java - 单个kafka消费者-从多个主题读取-消息的消费顺序是什么

spring-integration - 在初始化方法中使用 channel 时,Spring集成 'Dispatcher has no subscribers'

java - 使用 Spring Integration 通过 FTP 下载单个文件

apache-kafka - Kafka 源连接器中的恰好一次语义