java - 在运行时将动态值传递给 kafka 消费者主题以实现消息驱动的 channel 适配器

标签 java spring apache-kafka spring-integration

要求是在运行时设置下面的topics属性,不重启服务器。我们这里如何实现呢? 目前我们正在从属性文件中读取值,但这里需要重新启动服务器才能反射(reflect)所做的更改。

示例: Sample.properties(部署目录内)

topic.list=topic1,topic2

并且希望将来从 topic3 中使用而无需重新启动服务器。

注意:发现topics是一个final变量。

尝试从文件系统路径(部署目录之外)读取 key (topic.list),但没有成功。

任何建议。

<int-kafka:message-driven-channel-adapter

               id="inAdapter"
               channel="fromKafka"
               connection-factory="connectionFactory"
               key-decoder="kafkaKeyDecoder"
               payload-decoder="kafkaDecoder"                              
               topics="${topic.list}"
               offset-manager="offsetManager"/>

最佳答案

您可以使用 Java DSL 按需动态添加适配器以获取其他主题...

@Autowired
private IntegrationFlowContext flowContext;

public void addAnotherListenerForTopics(String... topics) {
    IntegrationFlow flow =
        IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), topics))
            .channel("fromKafka")
            .get();
    this.flowContext.registration(flow).register();
}

bean.addAnotherListenerForTopics("added.new");

pom:

<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-java-dsl</artifactId>
  <version>1.2.1.RELEASE</version>
</dependency>

请注意,如果您使用代理分区分配,新容器需要不同的组 ID 以避免撤销现有分配。

关于java - 在运行时将动态值传递给 kafka 消费者主题以实现消息驱动的 channel 适配器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41971105/

相关文章:

java - 将 Kafka 与 Apache Calcite 集成

java - 为什么这段代码会产生非常嘈杂的正弦波?

java - 有没有办法从不同的方法访问数组?

java - AJAX调用java方法

apache-kafka - Apache Beam 中来自 Kafka 源代码的 Exactly once

apache-kafka - 无法建立到节点 -1 的连接。经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient)

java - 如何在不连接数据库的情况下通过Java创建BLOB对象

java - Camel + MyBatis + Apache Aries

java - 将复杂输入从 jquery 传递到 Spring MVC

java - 非 spring web 应用程序是否可以并行 Spring-MVC 应用程序?