我需要使用 Apache Kafka 和 org.springframework.kafka 依赖项来使用来自 Spring Boot 应用程序的 json 对象消息,但出现以下错误:
***************************
APPLICATION FAILED TO START
***************************
Description:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
这是我的 bean 类,其中包含为 Kafka 配置创建的所有必需 bean。
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String,Message> consumerFactory(){
Map<String,Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"sample-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaConsumerFactory<>(config,new StringDeserializer(),
new JsonDeserializer<>(Message.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListener(){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
有人可以帮助我吗?我究竟做错了什么?谢谢。
最佳答案
您可以使用application.properties/application.yml
来实现这一点。
请注意,Spring Boot 允许我们避免在创建带有 @Configuration
注释的 Java 类时引入的所有样板代码
application.yml 示例:
server:
port: 8085
#Kafka config props:
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
#Consumer Deserialization:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: myGroupId
enable-auto-commit: false
listener:
missing-topics-fatal: false
bootstrap-servers
:可用的 Kafka 服务器列表。
key-deserializer
:消费者 key 反序列化类。使用 Kafka 库的 StringDeserializer 类。
value-deserializer
:消费者值反序列化类。当您使用 JSON 格式的字符串消息时,可以使用 Kafka 库的 StringDeserializer 类。
group-id
:Kafka 消费者的组 ID 值。
enable-auto-commit
:将此值设置为 false 可以手动提交偏移量消息,这可以避免消费者在处理当前消费的消息时消费新消息而崩溃消费者。
listener.missing-topics-fatal
:通过将该值设置为 false,您可以避免在代理上不存在任何已配置主题的情况下在应用程序启动期间显示不必要的错误。
希望这有帮助。
关于java - 找不到“org.springframework.kafka.core.ConsumerFactory”,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60806326/