java - Kafka + Spring Boot + Docker Compose - 接收重复消息

标签 java docker apache-kafka spring-kafka

我正在使用两个微服务,以便使用 Kafka 相互通信。在一个微服务上有监听器,在另一个微服务上有发送器。问题是每当生产者发送一条消息时,消费者都会收到 20 条重复的消息,而我希望消费者只收到一条消息。

微服务 A:

@Service
public class LocationProducer {

private static final String MAINLOCATION = "mainlocation";

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

public void sendMessage(String message){

    this.kafkaTemplate.send(MAINLOCATION,message);
}



kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

微服务 B:

@Service
public class LocationConsumer {

@Autowired
LocationService locationService;

@KafkaListener(topics = "mainlocation", groupId = "group_id")
public void consume(String location){
    locationService.saveLocation(new Location(1,location));
}




kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Docker 组合代码:

zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPIC: "mainlocation:1:3"

附加信息:

  1. 在 intelij 中进行调试时,如果我在以下位置设置断点:

    this.kafkaTemplate.send(主地址,消息);

    这个方法会被执行两次

  2. 在消费者方面,在收到消息后,它尝试保存到 mongoDB 中,但失败并出现异常(可能与此相关,我不知道)

  3. 我什至尝试将 autocommit 设置为 true,配置 max.poll seconds,但问题是一样的。

最佳答案

一些可能有帮助的建议。

  • 您将消费者设置为earliest。我不熟悉那个特定的 docker 图像,但如果你没有在测试尝试之间修剪卷,那么当你启动你的应用程序时,它可能会从你之前的尝试中获取记录 - 这可能会导致监听器逻辑被执行两次。如果您手动触发发送,您可以将其设置为 latest

  • 如果存在任何可能触发重试的错误,您应该从生产者方面进行检查。您可能想使用 Kafka 的控制台工具检查主题,以查看触发生产者后主题中实际上有多少条记录。

  • 您提到执行失败。默认情况下,Spring Kafka 消费记录失败会重试10次。如果这是一个问题,您应该使您的服务幂等或禁用重试。您还可以指定不应重试的致命异常。

关于java - Kafka + Spring Boot + Docker Compose - 接收重复消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71840490/

相关文章:

java - JSON 输出不遵循模型中的 JAXB 注释

php - 将Dockerfile从php:7.2-fpm更新到php:7.3-fpm时出错

ruby - 一段时间后,使用slack-ruby-bot gem从Slack机器人获取多个答案

java - 互联网上的点对点连接

java - 计算器错误

c++ - 将 3rd 方异步 API 与 Cap'n Proto RPC 集成的好方法是什么?

spring-boot - 使用 Spring Boot 创建 Kafka 主题

java - 如何使用 Java API 添加 SCRAM-SHA-512 kafka 配置?

Java:在查找哈希表的键时,重写 equals 方法不起作用?

docker - 如何配置docker-compose卷访问授权