我刚刚开始将 Spring Boot Stream 与 Kafka 结合使用。
我创建了一个生产者和一个消费者。我需要的是有两个相同的消费者(实际上是两个微服务)但具有不同的 groupId,因此它们都将读取主题并获取相同的消息。
现在我在properties.yml文件的资源下的spring boot应用程序项目中有groupId,是否可以在编译时将该值设置为参数或在启动时更好?
properties.yml
server:
port: 8087
eureka:
client:
serviceUrl:
defaultZone: http://IP:8761/eureka
spring:
application:
name: employee-producer
cloud:
stream:
kafka:
binder:
brokers: IP:9092
bindings:
greetings-in:
destination: greetings
contentType: application/json
greetings-out:
destination: greetings
contentType: application/json
kafka:
consumer:
group-id: 500
client-id: 99
最佳答案
根据要求,您需要同一主题的不同组(即group.id)的两个消费者,以便每个消息都可以被两个消费者消费
根据文档group.id
A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.
group.id需要在kafkaconsumerfactory初始化时设置
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
每当将具有 qnique group.id 的新组消费者添加到主题时,它将使用最新消息,因为默认情况下 auto.offset.reset 是最新
例如:
- 首先向kafka发送5条消息
- 现在添加新的消费者(它不会使用这些消息,因为默认偏移量是最新的)
要使其消耗这些消息偏移量,应将其指定为最早
关于java - 是否可以在启动或编译时在 Spring Boot Stream Kafka 中设置 groupId?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51205932/