java - Spring-Kafka 使用 ConcurrentKafkaListenerContainerFactory 来处理多个 @Kafkalistener

标签 java spring-boot apache-kafka spring-kafka

我正在致力于使用 Spring-Kafka 框架实现 Kafka Topics 消息的消费。我试图了解我为 Kafka Listener 创建的 ConcurrentKafkaListenerContainerFactory 的一些用法。 @KafkaListener 工作正常并且符合预期,但是,在我的场景中,我有多个独立的监听器,分别监听多个主题。我想知道我是否可以在所有监听器中重用 ConcurrentKafkaListenerContainerFactory,或者我是否必须为每个 @KafkaListener 创建一个 containerFactory。有没有办法拥有一个可以在所有@Kafkalistener之间共享的通用containerFactory

谢谢你

最佳答案

是的;这就是重点 - 它是监听器容器的工厂;您通常只需要一个启动自动配置的工厂。

如果您需要监听器的不同属性(例如反序列化器),最新版本(自 spring-kafka 2.2.4 起)允许您覆盖注释上的使用者属性。

覆盖其他属性,例如容器属性,对于单个监听器,将监听器容器定制器添加到工厂。

@Component
class ContainerFactoryCustomizer {

    ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory) {
        factory.setContainerCustomizer(
                container -> {
                    String groupId = container.getContainerProperties().getGroupId();
                    if (groupId.equals("foo")) {
                        container.getContainerProperties().set...
                    }
                    else {
                        container.getContainerProperties().set...
                    }
                });
    }

正如您所看到的,通过访问 groupId() 容器属性,您可以在调用它时知道我们正在创建哪个容器。

如果您的监听器的配置差异很大,您可能需要使用 2 个工厂,但这样您就会失去启动的自动配置功能(至少对于工厂而言)。

关于java - Spring-Kafka 使用 ConcurrentKafkaListenerContainerFactory 来处理多个 @Kafkalistener,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61436049/

相关文章:

java - Kafkameter NoClassDefFoundError

amazon-web-services - "Client subnets"在 Amazon Managed Streaming for Apache Kafka 中意味着什么?

java - 如何在 TabLayout.Tab 上添加删除图标以通过单击添加的删除图标来删除选项卡

java - 在一段时间后调用 API 时第一次获取连接重置异常

java - 方法的编译器错误

java - 如何从 Pivotal Cloud Foundry 中托管的 Java 应用程序访问 Windows NAS 文件共享

java - Spring Boot 无法识别 MongoDB RestController 类

apache-kafka - 我可以设置Kafka Stream消费者group.id吗?

java - 如何创建可重复使用的 map

java - 下载文件夹的一般路径