java - Spring AMQP 优先消息

标签 java spring rabbitmq amqp spring-rabbit

RabbitMQ 队列中的消息优先级。它与 rabbitmq 提供的 java 客户端一起工作。但它不适用于 spring-rabbit 依赖项。请看一看。

  • RabbitMQ 服务器版本 - 3.6.5
  • Erlang - OTP 19 (8.0)

使用RabbitMQ Java 客户端
pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.springframework.samples</groupId>
    <artifactId>RabbitMQ</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <developers>
        <developer>
            <name>Sagar Rout</name>
        </developer>
    </developers>

    <properties>
        <!-- Generic properties -->
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <!-- Spring -->
        <spring-framework.version>4.3.2.RELEASE</spring-framework.version>
    </properties>

    <dependencies>
        <!-- Spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>

        <!-- Spring AMQP -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.6.1.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>  

Publisher.java

public class Publisher {

private final static String QUEUE_NAME = "S1_Priority";

public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-max-priority", 10);
    channel.queueDeclare(QUEUE_NAME, false, false, false, args);
    String message = "Hello World!";

    for (int i = 0; i < 10; i++) {
        channel.basicPublish("", QUEUE_NAME,
                new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(i).build(),
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'" + "priority" + i);
    }
    channel.close();
    connection.close();
}}  

消费者.Java

public class Consumer {

private final static String QUEUE_NAME = "S1_Priority";

public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-max-priority", 10);
    channel.queueDeclare(QUEUE_NAME, false, false, false, args);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DefaultConsumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'" + properties.getPriority());
        }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);
}}  

这是有效的,具有更高优先级的消息即将出现。但它不适用于 Spring-rabbit。请找到代码。
RabbitMQConfig.class

@Configuration
@ComponentScan(basePackages = { "com.blackocean.*" })
@PropertySource("classpath:config.properties")
public class RabbitMQConfig {

@Value("${rabbitmq.host}")
private String host;

@Value("${rabbitmq.port}")
private Integer port;

@Value("${rabbitmq.username}")
private String username;

@Value("${rabbitmq.password}")
private String password;

@Value("${rabbitmq.connection.size}")
private Integer connectionSize ;

@Bean
public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() {
    return new PropertySourcesPlaceholderConfigurer();
}

@Bean
public ConnectionFactory connectionFactory() {

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
    cachingConnectionFactory.setHost(host);
    cachingConnectionFactory.setPort(port);
    cachingConnectionFactory.setUsername(username);
    cachingConnectionFactory.setPassword(password);
    cachingConnectionFactory.setConnectionLimit(connectionSize);

    return cachingConnectionFactory;
}

@Bean
public RabbitAdmin rabbitAdmin() {
    return new RabbitAdmin(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() {
    return new RabbitTemplate(connectionFactory());
}

@Bean
public Queue queue() {
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-priority", 10);
    Queue queue = new Queue("myQueue", true, false, false, args) ; 
    return queue ;
}}  

发送使用Java配置

public class Send1UsingJavaConfig {

/**
 * @param args
 */
public static void main(String[] args) {

    ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class);
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        rabbitTemplate.convertAndSend("", "myQueue", "Hi Mr.Ocean 10", new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                 message.getMessageProperties().setPriority(9);
                return message;
            }
        });
    }
}  

使用JavaConfig接收

public class RecvUsingJavaConfig {

public static void main(String[] args) {
    ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class);
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

    // Basic Example
    String message = (String) rabbitTemplate.receiveAndConvert("myQueue");
    System.out.println(message);
}}  

配置属性

#RabbitMQ
rabbitmq.host=localhost
#Always provide port and connection size in numbers
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.connection.size=100

现在我以不同的优先级发送消息,但它总是按顺序接收消息。任何建议都会很棒!!!

最佳答案

这里只是一个猜测,我尝试查看我使用过的旧 AMQP 库(旧版本 Rabbit MQ 中的优先级队列)。

优先级设置如下

args.put("x-max-priority", 10); ,看起来和 args.put("x-priority", 10);

你可以引用旧的priority queue repo在链接中。您可以尝试看看是否有帮助

关于java - Spring AMQP 优先消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39436274/

相关文章:

java - bean 中字段的初始化顺序

java - 当我用户 Angular 发布数据时,我在 Spring Controller 中得到空值

java - Spring @ConfigurationProperties 多个属性返回空

java - 为用户在应用程序中创建的每个主题创建一个新队列

java - 我可以自动检测连接特定设备的串行端口吗?

java - 如何在 C++ 中使用 JNI 将异常堆栈跟踪重定向到日志文件?

java - Jain-Sip 身份验证

java - 如何声明两个出站 channel 适配器来发布不同的消息

asp.net - 使用 RabbitMQ 或 SignalR 的微服务架构

java - 主机卡模拟中的最大 APDU 大小。可以再长一点吗?