java - Spring AMQP RabbitMQ 实现优先级队列

标签 java apache-camel rabbitmq spring-amqp spring-rabbit

谷歌几天后,我相信我完全迷路了。我想实现一种具有大约 3 个队列的优先级队列:

  1. 高优先级队列(每日),需要首先处理。
  2. 中等优先级队列(每周),如果队列 #1 中没有项目,它将处理。 (这个队列中的消息没问题,它根本不会处理)
  3. 低优先级队列(每月),如果队列 #1 和 #2 中没有项目,它将处理。 (这个队列中的消息没问题,它根本不会处理)

最初我有以下流程,让消费者消费来自所有三个队列的消息并检查队列#1、#2 和#3 中是否有任何项目。然后我意识到这是错误的,因为:

  1. 我完全迷失了一个问题:“我怎么知道它来自哪个队列?”。
  2. 我已经在使用来自任何队列的消息,所以如果我从较低优先级队列中获取一个对象,如果我发现较高优先级队列中有消息,我是否会将其放回队列?<

以下是我目前的配置,可见我是个白痴。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

<rabbit:connection-factory id="connectionFactory" host="localhost" />

<rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory"
    exchange="" routing-key="daily_queue"/>

<rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory"
    exchange="" routing-key="weekly_queue"/>

<rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory"
    exchange="" routing-key="monthly_queue"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" />
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" />
</rabbit:listener-container>    

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" />
</rabbit:listener-container>    

<bean id="Consumer" class="com.test.Consumer" />

</beans>

知道我应该如何使用优先队列解决这个问题吗?

ps:我也想知道,Apache Camel 是否有我可以依赖的东西?

更新 1:我刚从 Apache Camel 看到这个:“https://issues.apache.org/jira/browse/CAMEL-2537”JMSPriority 上的定序器似乎是我要找的,以前有人试过这个吗?

更新 2:假设我要使用基于@Gary Russell 推荐的 RabbitMQ 插件,我有以下 spring-rabbitmq 上下文 XML 配置,这似乎很有意义(由 guest..):

<rabbit:queue name="ad_google_dfa_reporting_queue">
    <rabbit:queue-arguments>
            <entry key="x-max-priority" value="10"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="adGoogleDfaReporting" method="consume" queue-names="ad_google_dfa_reporting_queue" />
</rabbit:listener-container>

<bean id="Consumer" class="com.test.Consumer" />

上述 xml 配置已成功创建一个队列,名称为:“ad_google_dfa_reporting_queue”,参数参数为:x-max-priority: 10 & durable: true

但当涉及到以优先级发送消息的代码时,我完全失去了它。如何定义示例 URL 中提到的优先级:https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java

AmqpTemplate amqpTemplateGoogleDfaReporting = (AmqpTemplate) applicationContext.getBean("amqpTemplateadGoogleDfaReporting");
amqpTemplateGoogleDfaReporting.convertAndSend("message"); // how to define message priority?

更新 3:根据@Gary 的回答,我设法发送消息中设置了优先级的消息,如下图所示: message priority screenshot 但是,当我以 1-10 之间的随机优先级发送 1000 条消息时,消费者正在使用各种优先级的消息。 (我只希望首先使用高优先级消息)。以下是消息生产者的代码:

    Random random = new Random();
    for (int i=0; i< 1000; i++){
        final int priority = random.nextInt(10 - 1 + 1) + 1;

        DfaReportingModel model = new DfaReportingModel();
        model.setReportType(DfaReportingModel.ReportType.FACT);
        model.setUserProfileId(0l + priority);
        amqpTemplateGoogleDfaReporting.convertAndSend(model, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setPriority(priority);
                return message;
            }
        });
    }

消息消费者的代码如下:

    public void consume(DfaReportingModel message) {
        System.out.println(message.getUserProfileId());

        Thread.sleep(500);
    }

我得到的结果:

9, 10, 7, 9, 6, 4, 10, 10, 3, 10, 6, 1, 5, 6, 6, 3, 4, 7, 6, 8, 3, 1, 4, 5, 5, 3, 10, 9, 5, 1, 8, 9, 6, 9, 3, 10, 7, 4, 8, 7, 3, 4, 8, 2, 6, 9, 6, 4, 7, 7, 2, 8, 4, 4, 1,

更新 4:问题已解决!从 https://github.com/rabbitmq/rabbitmq-priority-queue 了解示例代码在我的环境中工作,我认为问题出在 spring 上下文中。因此,在对不同类型的配置进行了无数次尝试和错误之后,我指出了使它起作用的确切组合!并且如下所示:

    <rabbit:queue name="ad_google_dfa_reporting_queue">
    <rabbit:queue-arguments>
        <entry key="x-max-priority">
            <value type="java.lang.Integer">10</value> <!-- MUST specifically define java.lang.Integer to get it to work -->
        </entry>
    </rabbit:queue-arguments>
</rabbit:queue>

如果没有特别定义值为Integer类型,优先级队列不起作用。终于解决了。耶!

最佳答案

RabbitMQ 现在有一个 priority queue plugin消息按优先顺序传递。最好使用它而不是你的重新排队低优先级消息的方案,这在运行时会非常昂贵。

编辑:

当使用 rabbitTemplate.convertAndSend(...) 方法时,您想要设置消息的优先级属性,您需要实现自定义 MessagePropertiesConverter在模板中(DefaultMessagePropertiesConverter 的子类)或使用采用消息后处理器的 convertAnSend 变体;例如:

template.convertAndSend("exchange", "routingKey", "message", new MessagePostProcessor() {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setPriority(5);
        return message;
    }
});

关于java - Spring AMQP RabbitMQ 实现优先级队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26838803/

相关文章:

java - 要解析的 'Push Approach' 和 'Pull Approach' 是什么?

java - HTTP 请求(使用 SSL)JAVA 与 AJAX

java - 如何在不同 Camel 路线上的方法之间共享对象

apache-camel - 如何在 Camel 中获取没有扩展名的文件名

python - 将 RabbitMQ 与 Plone 一起使用 - 是否使用 Celery?

rabbitmq - RabbitMQ实际上如何物理存储消息?

java - Drools 5.1.1 意外的累积和收集行为

java - 如何在 Javadoc 中包含 Google Analytics 片段?

java - .NET 等效于 Java 有界通配符 (IInterf<?>)?

java - 奇怪的 Apache Camel 异常