spring-cloud - Spring Cloud Stream Kafka - 最终一致性 - Kafka 是否自动重试未确认的消息(使用 autocommitoffset=false 时)

标签 spring-cloud microservices distributed-transactions spring-cloud-stream spring-kafka

事实证明,实现最终一致的分布式架构是一件痛苦的事情。有大量的博客文章讲述了如何做到这一点,但没有展示(代码)如何实际做到这一点。

我遇到的一个方面是在消息未被确认时必须处理消息的手动重试。

例如:我的订单服务向 Kafka 发送一个支付事件。支付服务订阅它并处理它,回答支付成功或支付失败

  1. 请求支付:订单服务----支付事件----> Kafka ----支付事件---->支付服务

    <
  2. Payment OK: -> Payment Service ----Payment ok event ----> Kafka ----Payment ok Event ----> Order Service

  3. 支付失败-> 支付服务----支付失败事件----> Kafka ----支付失败事件---->订单服务

重点是:

我可以确定何时使用同步发送将消息传送到 Kafka。但是,我必须知道付款已由付款服务处理的唯一方法是期待一个应答事件(付款正常 | 付款失败)。

这迫使我在订单服务器中实现重试机制。如果它在一段时间内没有得到答案,请使用新的支付事件重试。

此外,这还迫使我处理支付服务中的重复消息,以防它们实际上已被处理但答案没有到达订单服务。

我想知道 Kafka 是否有一个内置机制来在消费者不确认消息的新偏移量时发送重试

在 Spring Cloud Stream 中,我们可以将 autoCommitOffset 属性设置为 false 并处理消费者中偏移量的确认:

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }

如果我们不执行 acknowledgment.acknowledge(); 消息会被 Kafka 自动重新发送给这个消费者吗?

如果可能的话,我们就不需要再手动重试了,可以做这样的事情:

付款人服务:

 @Autowired
 private PaymentBusiness paymentBusiness;

 @StreamListener(Sink.INPUT)
 public void process(Order order) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         paymentBusiness(order);            
         //If we don't get here because of an exception 
         //Kafka would retry...
         acknowledgment.acknowledge();
     }
 }

如果可以的话,Kafka 中的重试周期是如何配置的?

在最坏的情况(也是最有可能的)情况下,这不受支持,我们将不得不手动重试。您知道 Spring Cloud Stream 应用程序使用 Kafka 处理最终一致性的任何真实示例吗?

最佳答案

What happens if we don't execute acknowledgment.acknowledge(); Will the message be automatically resent by Kafka to this consumer?

没有。只要客户端打开,Kafka 消费者就会按顺序读取消息。 Kafka 不支持更复杂的确认模式,例如单独的消息确认,仅更新给定消费者组和分区主题的偏移量。 Spring Cloud Stream 支持在 Spring Cloud Stream 中对消息进行异步处理的场景进行手动确认(从而防止消息丢失)——但前提是一旦消息被手动确认,它的偏移量就会被保存,因此所有以前的消息都来自同一个分区主题将被视为“已读”。如果您想挑出失败的消息,您可以使用 DLQ 支持 - 并让后续的消费者接收它们。重新启动消费者将从上次保存的偏移量恢复读取,因此您可以选择不为一系列未成功处理的消息保存偏移量。

Spring Cloud Stream 消费者具有内置的重试和 DLQ 支持 - 请参阅 http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_kafka_consumer_properties 中的 enableDlq以及作为默认使用者属性的一部分提供的重试设置:http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_consumer_properties

关于spring-cloud - Spring Cloud Stream Kafka - 最终一致性 - Kafka 是否自动重试未确认的消息(使用 autocommitoffset=false 时),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42230797/

相关文章:

microservices - 微服务中 API 网关的异步响应

java - Spring Cloud Dataflow Kubernetes 从 dockerfile 获取 jar 的属性

java - 为什么 java 应用程序在 Docker 容器中启动而不在暴露的端口上可用?

spring-boot - 动态刷新spring boot配置

java - 即使服务器中途出现故障,如何可靠地确保本地和远程数据副本均已更新

microservices - 如何在微服务架构中实现即时一致性?

java - 如何管理Web服务调用和数据库之间的分布式事务协调?

spring-cloud - Spring Cloud Zuul - 使用生成的 jwt token 时拒绝来自 auth-server 的访问

laravel - 在流明框架中启用 session

node.js - Node.js 微服务之间的通信