java - Spring 集成: Reading from Kafka Queue

标签 java spring-integration spring-kafka

在 Spring Boot 应用程序中,我想使用 Spring Integration 从 Kafka 队列中读取数据。配置如下:

@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(receiver());
    return kafkaMessageDrivenChannelAdapter;
}

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
    ContainerProperties properties = new ContainerProperties(this.topic);
    return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = ... // set properties
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public DirectChannel receiver() {
    return new DirectChannel();
}

@Autowired
private Resolver resolver;

@Bean
public EventDrivenConsumer getEventDrivenConsumer() {
    return new EventDrivenConsumer(receiver(), resolver);
}

Resolver bean 实现 MessageHandler

消息在队列上接收,但不由解析器 bean 处理。

Spring Boot应用程序注释如下:

@SpringBootApplication(exclude = KafkaAutoConfiguration.class)

所以 Kafka beans 不应该自动配置。

错误如下:

java.lang.NullPointerException: null
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:246) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1025) [spring-kafka-1.1.7.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_20]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_20]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]

调试时,很明显在 RecordMessagingMessageListenerAdapter(堆栈跟踪顶部)中,this.methodHandler 为 null。

Spring Integration 中将 channel 连接到应处理消息的 bean 的正确方法是什么?

最佳答案

这是决议:

该项目的父声明如下:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.10.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

对于以下依赖项:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>${spring-integration-kafka.version}</version>
</dependency>

之前使用过,spring-integration-kafka.version:

3.0.1.RELEASE

将其更改为:

2.1.0.RELEASE

一切正常。

但是,如果没有 spring-integration-kafka 的显式版本,该项目将因缺少类而无法构建。

Boot 的优点之一是它可以处理依赖版本。也许应该有一个 spring-boot-integration-kafka 依赖项,这可以防止此问题。

关于java - Spring 集成: Reading from Kafka Queue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48904101/

相关文章:

Java如何将包含多个 double 的byte []转换为double []

java - 为什么带有可变参数的 Java 方法被识别为 transient ?

Java:编译冲突:返回类型与 HATEOAS ResourceSupport.getId 不兼容

error-handling - Spring Integration - 如何将错误处理委托(delegate)给单独的线程

java - 如何为 MongoDbMessageStore.MessageReadingMongoConverter 设置 mapKeyDotReplacement

spring-boot - Spring Reactive kafka Receiver 总是将引导服务器覆盖到本地主机

apache-kafka - 如何每小时汇总数据?

java - 如何在 java 中将 Long 转换为 byte[] 并返回

Spring集成计划作业

用于 Kafka 的 Azure 事件中心,具有来自同一组的 2 个消费者无限重新平衡