我正在使用两个微服务,以便使用 Kafka 相互通信。在一个微服务上有监听器,在另一个微服务上有发送器。问题是每当生产者发送一条消息时,消费者都会收到 20 条重复的消息,而我希望消费者只收到一条消息。
微服务 A:
@Service
public class LocationProducer {
private static final String MAINLOCATION = "mainlocation";
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public void sendMessage(String message){
this.kafkaTemplate.send(MAINLOCATION,message);
}
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
微服务 B:
@Service
public class LocationConsumer {
@Autowired
LocationService locationService;
@KafkaListener(topics = "mainlocation", groupId = "group_id")
public void consume(String location){
locationService.saveLocation(new Location(1,location));
}
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Docker 组合代码:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPIC: "mainlocation:1:3"
附加信息:
在 intelij 中进行调试时,如果我在以下位置设置断点:
this.kafkaTemplate.send(主地址,消息);
这个方法会被执行两次
在消费者方面,在收到消息后,它尝试保存到 mongoDB 中,但失败并出现异常(可能与此相关,我不知道)
我什至尝试将 autocommit 设置为 true,配置 max.poll seconds,但问题是一样的。
最佳答案
一些可能有帮助的建议。
您将消费者设置为
earliest
。我不熟悉那个特定的 docker 图像,但如果你没有在测试尝试之间修剪卷,那么当你启动你的应用程序时,它可能会从你之前的尝试中获取记录 - 这可能会导致监听器逻辑被执行两次。如果您手动触发发送,您可以将其设置为latest
。如果存在任何可能触发重试的错误,您应该从生产者方面进行检查。您可能想使用 Kafka 的控制台工具检查主题,以查看触发生产者后主题中实际上有多少条记录。
您提到执行失败。默认情况下,
Spring Kafka
消费记录失败会重试10次。如果这是一个问题,您应该使您的服务幂等或禁用重试。您还可以指定不应重试的致命异常。
关于java - Kafka + Spring Boot + Docker Compose - 接收重复消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71840490/