java - 使用 DeadLetterPublishingRecoverer 和手动 ack 模式的 Spring Kafka 监听器

标签 java spring spring-kafka

我很难理解如何解决这个问题,所以我在这里提问,希望其他人已经遇到过同样的问题。 我们正在使用手动确认模式和重试次数为 3 的死信恢复器运行我们的@KafkaListener。 由于在某些情况下(外部依赖)我们不确认消息并暂停消费 5 分钟的业务逻辑,因此需要手动确认模式。

此外,我们确实需要死信队列来存放由于某种原因无法处理的消息。

现在手动确认模式的问题是我们的监听器/消费者在达到重试限制并将其移动到 dl 队列时不确认消息。

如果消费者服务将被重新启动,他会尝试再次消费消息并将它们移动到 dl 队列。

有什么办法可以解决这个问题吗?

来自汉堡的感谢和问候!

最佳答案

如果可能,我会尽量避免使用手动确认;也许通过增加 max.poll.interval.ms 来代替。

如果您使用 AckMode.MANUAL_IMMEDIATE,直接在错误处理程序中的 Consumer 上执行提交是安全的。

继承SeekToCurrentErrorHandler并覆盖handle(),如果super.handle()没有抛出异常,则意味着重试超过了,您可以在 Consumer 上提交偏移量。

关于java - 使用 DeadLetterPublishingRecoverer 和手动 ack 模式的 Spring Kafka 监听器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53781327/

相关文章:

java - 一台 Apache Tomcat 应用程序服务器,带有 2 个已配置的 war 应用程序和一个 MessageSource 异常

java - 当我的集合作为 ArrayCollection 发送并在服务器上作为 java.util.Set 接收时,如何保持集合的排序顺序?

java - EmbeddedKafkaBroker 设置属性抛出 NoClassDefFoundError : kafka/zk/EmbeddedZookeeper

java - 无法使用ChainedKafkaTransaction同步Kafka和MQ事务

java - 使用 KafkaProperties 配置多个 kafka 主题名称的最佳方法是什么

java - 随机访问文件.write : EBADF (Bad file descriptor)

java - 带有 DFC 代码的 Jython

java - 从类中获取bean id

java - 枚举、单例和反序列化

java - 如何让 Graal SDK 包在没有 Maven 的情况下工作?