java - 在 Spring Boot 上动态设置 Spring AMQP 和 RabbitMQ 的主机

标签 java spring rabbitmq spring-amqp

我有一个问题,我不知道如何动态设置主机并在不同主机上进行 RPC 操作

情况是这样的

我有多个 RabbitMQ 在不同的服务器和网络上运行(即 192.168.1.0/24、192.168.2.0/24)。

行为是我有一个 IP 地址列表,我将使用它执行 RPC。 因此,对于 ip 地址列表中的每个条目,我想执行 convertSendAndReceive 并处理回复等。

尝试了文档中的一些代码,但似乎即使无效地址(没有运行有效的 RabbitMQ 的地址,或者网络上不存在事件的地址,例如 1.1.1.1)也无法工作有效的 RabbitMQ(例如在 192.168.1.1 上运行)

注意:我可以在正确的地址上成功执行 RPC 调用,但是,我也可以在我不认为的无效地址上成功执行 RPC 调用

有人对此有任何想法吗?

这是我的来源

TaskSchedulerConfiguration.java

@Configuration
@EnableScheduling
public class TaskSchedulerConfiguration {
    @Autowired
    private IpAddressRepo ipAddressRepo;

    @Autowired
    private RemoteProcedureService remote;

    @Scheduled(fixedDelayString  = "5000", initialDelay = 2000)
    public void scheduledTask() {
        ipAddressRepo.findAll().stream()
              .forEach(ipaddress -> {
                boolean status = false;
                try {
                    remote.setIpAddress(ipaddress);
                    remote.doSomeRPC();                 
                } catch (Exception e) {
                    logger.debug("Unable to Connect to licenser server: {}", license.getIpaddress());
                    logger.debug(e.getMessage(), e);
                } 
              });

    }

}

RemoteProcedureService.java

@Service
public class RemoteProcedureService {

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private DirectExchange exchange;


    public boolean doSomeRPC() throws JsonProcessingException {

        //I passed this.factory.getHost() so that i will know if only the valid ip address will be received by the other side
        //at this point, other side receives invalid ipaddress which supposedly will not be receive by the oher side
        boolean response = (Boolean) template.convertSendAndReceive(exchange.getName(), "rpc", this.factory.getHost());
        return response;
    }

    public void setIpAddress(String host) {
        factory.setHost(host);
        factory.setCloseTimeout(prop.getRabbitMQCloseConnectTimeout());
        factory.setPort(prop.getRabbitMQPort());
        factory.setUsername(prop.getRabbitMQUsername());
        factory.setPassword(prop.getRabbitMQPassword());
        template.setConnectionFactory(factory);

    }

}

AmqpConfiguration.java

@Configuration
public class AmqpConfiguration {
    public static final String topicExchangeName = "testExchange";

    public static final String queueName = "rpc";

    @Autowired
    private LicenseVisualizationProperties prop;

//Commented this out since this will only be assigne once
//i need to achieve to set it dynamically in order to send to different hosts
//so put it in RemoteProcedureService.java, but it never worked
//    @Bean
//    public ConnectionFactory connectionFactory() {
//        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
//        connectionFactory.setCloseTimeout(prop.getRabbitMQCloseConnectTimeout());
//        connectionFactory.setPort(prop.getRabbitMQPort());
//        connectionFactory.setUsername(prop.getRabbitMQUsername());
//        connectionFactory.setPassword(prop.getRabbitMQPassword());
//        return connectionFactory;
//    }

    @Bean
    public DirectExchange exhange() {
        return new DirectExchange(topicExchangeName);
    }

}

更新1

看来,在循环期间,当在CachingConnectionFactory中设置一个有效的ip时,无论有效还是无效,都会被中设置的第一个有效ip接收缓存连接工厂

更新2

我发现一旦它能够成功建立连接,它就不会创建新的连接。如何强制 RabbitTemplate 建立新连接?

最佳答案

这是一个相当奇怪的用例,并且性能不会很好;你最好有一个连接工厂和模板池。

但是,回答你的问题:

调用resetConnection()关闭连接。

关于java - 在 Spring Boot 上动态设置 Spring AMQP 和 RabbitMQ 的主机,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62307711/

相关文章:

java - Micrometer 中的计数器线程安全吗

java - 维基百科解析器

java - 如何计算字符串中字符的频率?

java - Java中两个非均匀数组中的每个元素相加

java - 有什么方法可以为@RequestParam 获取BindingResult?

spring - org.hibernate.hql.internal.ast.QuerySyntaxException : Employee is not mapped [from Employee];

java - JMSException InterruptedIOException - 生产者线程被中断

rabbitmq - 如何编写自定义水槽 OG 接收器

redis - 建议 - Redis 或 RabbitMQ PubSub - 拍卖服务

java - 如何从html :options collections获取选定选项值