java - 如何为 ActiveMQ 队列创建 Spring Boot 消费者?

标签 java spring-boot activemq

我正在学习 ActiveMQ,到目前为止,我已经制作了一个简单的 Spring Boot 生产者+消费者应用程序(出于本问题的目的,将其称为 App1),它与 ActiveMQ 的本地实例进行通信,并且一切都按预期进行。

现在我正在尝试运行另一个 Spring Boot 应用程序(在同一台计算机上,但在确保 App1 未运行之后),该应用程序只有一个消费者(没有生产者),但是当我启动此应用程序时应用程序中,队列中的消息(我使用修改后的 App1 放置的,其中删除了应用程序的使用者部分)不会被拾取。在App1中,消息一发布,消费者就会打印出system.out打印语句,但在这个仅限消费者的应用程序中并非如此。下面是我的监听器组件类:

package com.demo.listener;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @JmsListener(destination = "testqueue")
    public void consume(String message) {

        System.out.println("Picked up message: " + message);

    }
}

为了实现所需的行为,我需要进行哪些更改?

App1 application.properties 文件:

spring.activemq.in-memory=false
spring.activemq.pool.enabled=false
server.port=9000
activemq.broker-url=tcp://localhost:61616
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration, org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration
security.basic.enabled=false
management.security.enabled=false

App1 JmsConfig 类

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class JmsConfig {

    @Value("${activemq.broker-url}")
    private String brokerUrl;

    @Bean
    public Queue queue() {
        return new ActiveMQQueue("testqueue");
    }

    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory() {

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL(brokerUrl);
        return factory;
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        return  new JmsTemplate(activeMQConnectionFactory());
    }

}

App1 生产者类

import javax.jms.Queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/rest/publish")
public class ProducerResource {

    @Autowired
    JmsTemplate jmsTemplate;

    @Autowired
    Queue queue;

    @GetMapping("/{message}")
    public String publishMessage(@PathVariable("message") final String message) {

        jmsTemplate.convertAndSend(queue, message);

        return "Published successfully";
    }

}

App1 消费者类与我在仅限消费者的应用程序(上面列出)中使用的类相同。

最佳答案

对于您的消费者应用程序,您确实需要为您的消费者 JMS 模板添加池连接工厂和 JMS 消息监听器工厂才能开始接收消息。

@Configuration
@EnableJms
public class ConsumerConfig {

  @Value("${activemqbrokerurl}")
  private String brokerUrl;

  @Bean
  public ActiveMQConnectionFactory activeMQConnectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL(brokerUrl);
    return activeMQConnectionFactory;
  }

  @Bean
  public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(activeMQConnectionFactory());
    factory.setConcurrency("{#setDesiredConcurrency}");
    return factory;
  }
}

Spring's MessagListenerContainer should be used for message consumption. This provides all the power of MDBs - efficient JMS consumption and pooling of the message listeners - but without requiring a full EJB container.

You can use the activemq-pool org.apache.activemq.pool.PooledConnectionFactory for efficient pooling of the connections and sessions for your collection of consumers, or you can use the Spring JMS org.springframework.jms.connection.CachingConnectionFactory to achieve the same effect.

您可以阅读更多相关信息here

关于java - 如何为 ActiveMQ 队列创建 Spring Boot 消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51331135/

相关文章:

java - NumberFormatException Android Java

elasticsearch - Spring Boot Elasticsearch 不起作用

mysql - 如何将 $(特殊字符)插入数据库

ssl - "unable to find valid certification path to requested target"添加新的 Keystore 到 ActiveMQ 之后

java - Active MQ 和 Amazon MQ 之间的区别

Java Jtable 电子表格格式

java - 是否可以通过注释类的对象而不是注释字段来更改类的字段的 json 名称

java - Spring MVC/Spring Data 数据获取递归

mysql - Spring Boot |创建类路径资源中定义的名称为 'entityManagerFactory' 的 bean 时出错

spring - 如何 - Spring 动态 DestinationResolver?