要求是在运行时设置下面的topics属性,不重启服务器。我们这里如何实现呢? 目前我们正在从属性文件中读取值,但这里需要重新启动服务器才能反射(reflect)所做的更改。
示例: Sample.properties(部署目录内)
topic.list=topic1,topic2
并且希望将来从 topic3 中使用而无需重新启动服务器。
注意:发现topics是一个final变量。
尝试从文件系统路径(部署目录之外)读取 key (topic.list),但没有成功。
任何建议。
<int-kafka:message-driven-channel-adapter
id="inAdapter"
channel="fromKafka"
connection-factory="connectionFactory"
key-decoder="kafkaKeyDecoder"
payload-decoder="kafkaDecoder"
topics="${topic.list}"
offset-manager="offsetManager"/>
最佳答案
您可以使用 Java DSL 按需动态添加适配器以获取其他主题...
@Autowired
private IntegrationFlowContext flowContext;
public void addAnotherListenerForTopics(String... topics) {
IntegrationFlow flow =
IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), topics))
.channel("fromKafka")
.get();
this.flowContext.registration(flow).register();
}
和
bean.addAnotherListenerForTopics("added.new");
pom:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
请注意,如果您使用代理分区分配,新容器需要不同的组 ID 以避免撤销现有分配。
关于java - 在运行时将动态值传递给 kafka 消费者主题以实现消息驱动的 channel 适配器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41971105/