我有一个问题,我不知道如何动态设置主机并在不同主机上进行 RPC 操作
情况是这样的
我有多个 RabbitMQ 在不同的服务器和网络上运行(即 192.168.1.0/24、192.168.2.0/24)。
行为是我有一个 IP 地址列表,我将使用它执行 RPC。
因此,对于 ip 地址列表中的每个条目,我想执行 convertSendAndReceive
并处理回复等。
尝试了文档中的一些代码,但似乎即使无效地址(没有运行有效的 RabbitMQ 的地址,或者网络上不存在事件的地址,例如 1.1.1.1)也无法工作有效的 RabbitMQ(例如在 192.168.1.1 上运行)
注意:我可以在正确的地址上成功执行 RPC 调用,但是,我也可以在我不认为的无效地址上成功执行 RPC 调用
有人对此有任何想法吗?
这是我的来源
TaskSchedulerConfiguration.java
@Configuration
@EnableScheduling
public class TaskSchedulerConfiguration {
@Autowired
private IpAddressRepo ipAddressRepo;
@Autowired
private RemoteProcedureService remote;
@Scheduled(fixedDelayString = "5000", initialDelay = 2000)
public void scheduledTask() {
ipAddressRepo.findAll().stream()
.forEach(ipaddress -> {
boolean status = false;
try {
remote.setIpAddress(ipaddress);
remote.doSomeRPC();
} catch (Exception e) {
logger.debug("Unable to Connect to licenser server: {}", license.getIpaddress());
logger.debug(e.getMessage(), e);
}
});
}
}
RemoteProcedureService.java
@Service
public class RemoteProcedureService {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange exchange;
public boolean doSomeRPC() throws JsonProcessingException {
//I passed this.factory.getHost() so that i will know if only the valid ip address will be received by the other side
//at this point, other side receives invalid ipaddress which supposedly will not be receive by the oher side
boolean response = (Boolean) template.convertSendAndReceive(exchange.getName(), "rpc", this.factory.getHost());
return response;
}
public void setIpAddress(String host) {
factory.setHost(host);
factory.setCloseTimeout(prop.getRabbitMQCloseConnectTimeout());
factory.setPort(prop.getRabbitMQPort());
factory.setUsername(prop.getRabbitMQUsername());
factory.setPassword(prop.getRabbitMQPassword());
template.setConnectionFactory(factory);
}
}
AmqpConfiguration.java
@Configuration
public class AmqpConfiguration {
public static final String topicExchangeName = "testExchange";
public static final String queueName = "rpc";
@Autowired
private LicenseVisualizationProperties prop;
//Commented this out since this will only be assigne once
//i need to achieve to set it dynamically in order to send to different hosts
//so put it in RemoteProcedureService.java, but it never worked
// @Bean
// public ConnectionFactory connectionFactory() {
// CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
// connectionFactory.setCloseTimeout(prop.getRabbitMQCloseConnectTimeout());
// connectionFactory.setPort(prop.getRabbitMQPort());
// connectionFactory.setUsername(prop.getRabbitMQUsername());
// connectionFactory.setPassword(prop.getRabbitMQPassword());
// return connectionFactory;
// }
@Bean
public DirectExchange exhange() {
return new DirectExchange(topicExchangeName);
}
}
更新1
看来,在循环期间,当在CachingConnectionFactory
中设置一个有效的ip时,无论有效还是无效,都会被中设置的第一个有效ip接收缓存连接工厂
更新2
我发现一旦它能够成功建立连接,它就不会创建新的连接。如何强制 RabbitTemplate
建立新连接?
最佳答案
这是一个相当奇怪的用例,并且性能不会很好;你最好有一个连接工厂和模板池。
但是,回答你的问题:
调用resetConnection()
关闭连接。
关于java - 在 Spring Boot 上动态设置 Spring AMQP 和 RabbitMQ 的主机,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62307711/