java - spring kafka 监听器在错误的位置查找 ContainerProperties

标签 java apache-kafka spring-kafka

我正在尝试在我的 spring boot 应用程序中使用 kafka 监听器,但服务器启动失败并出现以下错误:根本原因是它正在 org.springframework.kafka.listener.config 中寻找 ContainerProperties org.springframework.kafka.listener

中可用的子包
***************************
APPLICATION FAILED TO START
***************************

Description:

An attempt was made to call the method org.springframework.kafka.listener.AbstractMessageListenerContainer.getContainerProperties()Lorg/springframework/kafka/listener/config/ContainerProperties; but it does not exist. Its class, org.springframework.kafka.listener.AbstractMessageListenerContainer, is available from the following locations:

    jar:file:/Users/kumarman/.m2/repository/org/springframework/kafka/spring-kafka/2.2.0.RELEASE/spring-kafka-2.2.0.RELEASE.jar!/org/springframework/kafka/listener/AbstractMessageListenerContainer.class

It was loaded from the following location:

    file:/Users/kumarman/.m2/repository/org/springframework/kafka/spring-kafka/2.2.0.RELEASE/spring-kafka-2.2.0.RELEASE.jar


Action:

Correct the classpath of your application so that it contains a single, compatible version of org.springframework.kafka.listener.AbstractMessageListenerContainer

我的依赖图是:

lon2002619:wmt-service kumarman$ mvn dependency:tree | grep kafka
[INFO] +- org.springframework.kafka:spring-kafka:jar:2.2.0.RELEASE:compile
[INFO] |  \- org.apache.kafka:kafka-clients:jar:2.0.0:compile
[INFO] +- org.springframework.kafka:spring-kafka-test:jar:2.2.0.RELEASE:test
[INFO] |  +- org.apache.kafka:kafka-clients:jar:test:2.0.0:test
[INFO] |  +- org.apache.kafka:kafka_2.11:jar:2.0.0:test
[INFO] |  \- org.apache.kafka:kafka_2.11:jar:test:2.0.0:test
[INFO] |  |     +- io.zipkin.brave:brave-instrumentation-kafka-clients:jar:5.1.0:compile
[INFO] |     +- io.zipkin.reporter2:zipkin-sender-kafka11:jar:2.7.3:compile

而kafka的配置代码是:

@Configuration
public class KafkaConfiguration {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset());

        return props;
    }

    @Bean
    public ConsumerFactory<String, byte[]> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

启用跟踪日志后的堆栈跟踪:

2018-11-29 10:51:05.292 TRACE [kumarman-wmt-service,,,] 46241 --- [           main] o.s.c.io.support.SpringFactoriesLoader   : Loaded [org.springframework.boot.diagnostics.FailureAnalysisReporter] names: [org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter]
2018-11-29 10:51:05.301 DEBUG [kumarman-wmt-service,,,] 46241 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   : Application failed to start due to an exception

java.lang.NoSuchMethodError: org.springframework.kafka.listener.AbstractMessageListenerContainer.getContainerProperties()Lorg/springframework/kafka/listener/config/ContainerProperties;
    at org.springframework.cloud.sleuth.instrument.messaging.SleuthKafkaAspect.wrapListenerContainerCreation(TraceMessagingAutoConfiguration.java:191) ~[spring-cloud-sleuth-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:93) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory$$EnhancerBySpringCGLIB$$d4611e6.createListenerContainer(<generated>) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:183) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:155) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:129) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:164) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:158) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:259) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) ~[spring-beans-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) ~[spring-context-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.1.0.RELEASE.jar!/:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.0.RELEASE.jar!/:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.0.RELEASE.jar!/:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.0.RELEASE.jar!/:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.0.RELEASE.jar!/:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.0.RELEASE.jar!/:2.1.0.RELEASE]
    at com.betstars.wmt.app.WmtServiceApplication.main(WmtServiceApplication.java:14) [classes!/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
    at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [wmt-service-1.0.0-SNAPSHOT.jar:na]
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [wmt-service-1.0.0-SNAPSHOT.jar:na]
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [wmt-service-1.0.0-SNAPSHOT.jar:na]
    at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [wmt-service-1.0.0-SNAPSHOT.jar:na]

Pom 提取物:

<spring.boot.version>2.1.0.RELEASE</spring.boot.version>
<spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
<!-- Zipkin - tracing -->
<dependency>
<groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

最佳答案

也许您在代码中引用了它?

它在 2.2 中从 listener.config 移动到 listener。

https://github.com/spring-projects/spring-kafka/commit/b048aaa8f00045e4cc65d5d137b1aa372beca3a2#diff-7121e19c8f33f6bddfd42b749f0bddb0

参见 what's new .

编辑

我能够通过以下人为的示例重现您的问题:

针对spring-kafka 2.0编译这个类...

public class Foo {

    public void referenceOldClass(KafkaMessageListenerContainer<String, String> container) {
        container.getContainerProperties();
    }

}

然后从 Boot 2.1 应用程序中引用该类:

@SpringBootApplication
public class So53503830Application {

    public static void main(String[] args) {
        SpringApplication.run(So53503830Application.class, args);
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    @Autowired
    private ConsumerFactory<String, String> cf;

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            ContainerProperties props = new ContainerProperties("topic");
            KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(this.cf,  props);
            foo.referenceOldClass(container);
        };
    }

}

使用 DEBUG 日志记录,我们得到堆栈跟踪...

java.lang.NoSuchMethodError: org.springframework.kafka.listener.KafkaMessageListenerContainer.getContainerProperties()Lorg/springframework/kafka/listener/config/ContainerProperties;
    at com.example.Foo.referenceOldClass(Foo.java:29) ~[classes/:na]
    at com.example.So53503830Application.lambda$0(So53503830Application.java:32) [classes/:na]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:324) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at com.example.So53503830Application.main(So53503830Application.java:16) [classes/:na]

2018-11-28 15:02:56.962 ERROR 95564 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   : 

***************************
APPLICATION FAILED TO START
***************************

Description:

An attempt was made to call the method org.springframework.kafka.listener.KafkaMessageListenerContainer.getContainerProperties()Lorg/springframework/kafka/listener/config/ContainerProperties; but it does not exist. Its class, org.springframework.kafka.listener.KafkaMessageListenerContainer, is available from the following locations:

    jar:file:/Users/grussell/.m2/repository/org/springframework/kafka/spring-kafka/2.2.0.RELEASE/spring-kafka-2.2.0.RELEASE.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.class

It was loaded from the following location:

    file:/Users/grussell/.m2/repository/org/springframework/kafka/spring-kafka/2.2.0.RELEASE/spring-kafka-2.2.0.RELEASE.jar


Action:

Correct the classpath of your application so that it contains a single, compatible version of org.springframework.kafka.listener.KafkaMessageListenerContainer

关于java - spring kafka 监听器在错误的位置查找 ContainerProperties,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53503830/

相关文章:

java - Java 中的 boolean 变量存储什么? (根据内存、位模式)

java - 如果在 springboot 的结果中合并/组合,缓存方法(使用咖啡因)是否可能返回部分响应?

java - 如何创建动态 ANTLR4 词法分析器规则

java - 使用 KafkaListener 从 Kafka 消费一次最少 N 条消息

spring-boot - Spring Boot Kafka - Kafka 指标在/actuator/prometheus 中不可用

apache-kafka - KStream-KStream leftJoin 在窗口到期后不一致地发出

java - IntelliJ 在 JPQL 中显示(警告)错误

apache-kafka - Apache Kafka 如何使用打开的文件描述符?

apache-kafka - 有没有办法保证kafka topic中不插入重复记录?

hadoop - 数据从 Kafka 流向 HDFS 时,Flume 空间不足错误