java - 找不到“org.springframework.kafka.core.ConsumerFactory”

标签 java spring-boot apache-kafka spring-kafka

我需要使用 Apache Kafka 和 org.springframework.kafka 依赖项来使用来自 Spring Boot 应用程序的 json 对象消息,但出现以下错误:

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found. 

这是我的 bean 类,其中包含为 Kafka 配置创建的所有必需 bean。

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String,Message> consumerFactory(){
        Map<String,Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG,"sample-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaConsumerFactory<>(config,new StringDeserializer(),
                new JsonDeserializer<>(Message.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListener(){
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

有人可以帮助我吗?我究竟做错了什么?谢谢。

最佳答案

您可以使用application.properties/application.yml来实现这一点。
请注意,Spring Boot 允许我们避免在创建带有 @Configuration
注释的 Java 类时引入的所有样板代码 application.yml 示例:

server:
  port: 8085

#Kafka config props:
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    #Consumer Deserialization:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: myGroupId
      enable-auto-commit: false
    listener:
      missing-topics-fatal: false

bootstrap-servers:可用的 Kafka 服务器列表。

key-deserializer:消费者 key 反序列化类。使用 Kafka 库的 StringDeserializer 类。

value-deserializer:消费者值反序列化类。当您使用 JSON 格式的字符串消息时,可以使用 Kafka 库的 StringDeserializer 类。

group-id:Kafka 消费者的组 ID 值。

enable-auto-commit:将此值设置为 false 可以手动提交偏移量消息,这可以避免消费者在处理当前消费的消息时消费新消息而崩溃消费者。

listener.missing-topics-fatal:通过将该值设置为 false,您可以避免在代理上不存在任何已配置主题的情况下在应用程序启动期间显示不必要的错误。

希望这有帮助。

关于java - 找不到“org.springframework.kafka.core.ConsumerFactory”,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60806326/

相关文章:

java - 调用后单击按钮即可下载 Excel 报告

hadoop - 在 Kerberos 下为 Kafka 启动 Spark-Submit 作业

java - 在 Java 中实例化 BiMap 的 google-collections

java - 为什么间接增量比直接增量快?

java - 包含 Openshift Java RESTClient 时构建失败

apache-kafka - 具有 QoS/Kafka 分区过载的消息传递平台

apache-kafka - kafka 如何 ack 批量 AsyncProducer

java - 死存储到 List 对象 - 如何解决?

Java从父对象实例化子对象

java - spring-boot web应用程序无法启动: Unable to start ServletWebServerApplicationContext due to missing ServletWebServerFactory bean