rabbitmq - 在 RabbitMQ 中发布消息时发送 MessageProperties [priority=anyInteger]

标签 rabbitmq spring-integration spring-amqp

我们在我们的项目中使用rabbit MQ和Spring Integration。每个消息都有一个传递模式、 header 、属性和有效负载部分。 我们想要添加属性,即)优先级值为 2(任何整数),有效负载为“测试消息 3”,并将消息发布到名为 OES 的队列。请查看屏幕截图。

Adding properties[priority=3] in rabbitmq message

如何在下面的出站 channel 适配器(Spring Integration)中添加消息属性,即优先级=2(或任何值)。我知道我们可以通过添加“mapped-request-headers”来添加“headers”,但我想添加属性。没有为“出站 channel 适配器”中的 MessageProperties 定义任何属性。有没有办法解决这个问题。

我们的有效负载没有问题,它已经开始运行了。我们只想添加优先级=2(任何值)的 MessageProperties。如何将其添加到出站 channel 适配器中(不需要硬编码,应该是通用的)?

<!-- the mapped-request-headers should be symmetric with 
     the list on the consumer side defined in consumerbeans.consumerHeaderMapper() -->
<int-amqp:outbound-channel-adapter id="publishingAmqpAdapter" 
    channel="producer-processed-event-channel" 
    amqp-template="amqpPublishingTemplate"
    exchange-name="events_forwarding_exchange"
    routing-key-expression="headers['routing-path']"
    mapped-request-headers="X-CallerIdentity,routing-path,content-type,route_to*,event-type,compression-state,STANDARD_REQUEST_HEADERS"
/>

其他配置:

<!-- chain routes and transforms the ApplicationEvent into a json string -->
<int:chain id="routingAndTransforming"
    input-channel="producer-inbound-event-channel"
    output-channel="producer-routed-event-channel">
    <int:transformer ref="outboundMessageTracker"/>
    <int:transformer ref="messagePropertiesTransformer"/>
    <int:transformer ref="eventRouter"/>
    <int:transformer ref="eventToJsonTransformer"/>
</int:chain>

<int:transformer id="messagePayloadCompressor" 
   input-channel="compress-message-payload" 
   output-channel="producer-processed-event-channel"
   ref="payloadCompressor"/>

@Configuration("amqpProducerBeans")
@ImportResource(value = "classpath:com/apple/store/platform/events/si/event-producer-flow.xml")
public class AmqpProducerBeans {

    @Bean(name = { "amqpPublishingTemplate" })
        public AmqpTemplate amqpTemplate() {
            logger.debug("creating amqp publishing template");
            RabbitTemplate rabbitTemplate = new RabbitTemplate(producerConnectionFactory());
            SimpleMessageConverter converter = new SimpleMessageConverter();
            // following needed for retry logic
            converter.setCreateMessageIds(true);
            rabbitTemplate.setMessageConverter(converter);
            return rabbitTemplate;
        }

/*Other code commented */

}

其他代码:

import org.springframework.integration.Message;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.message.GenericMessage;

public class PayloadCompressor {

    @Transformer
    public Message<byte[]> compress(Message<String> message){
        /* some code commented */

        Map<String, Object> headers = new HashMap<String, Object>();
        headers.putAll(message.getHeaders());
        headers.remove("compression-state");
        headers.put("compression-state", CompressionState.COMPRESSED);
        Message<byte[]> compressedMessage = new GenericMessage<byte[]>(compressedPayload, headers);
      return compressedMessage;


    }

如果我们没有使用spring集成,那么我们可以使用下面的channel.basicPublish方式并发送MessageProperties。

ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/");
factory.setHost("10.102.175.30");
factory.setUsername("rahul");
factory.setPassword("rahul");
factory.setPort(5672);
Connection connection = factory.newConnection();
System.out.println("got connection "+connection);
Channel channel = connection.createChannel();
MessageProperties msgproperties= new MessageProperties() ;
MessageProperties.BASIC.setPriority(3);
// set Messageproperties with priority
    String exchangeName = "HeaderExchange";
      String routingKey = "testkey";
      //routingkey
      byte[] messageBodyBytes = "Message having priority value 3".getBytes();
      channel.basicPublish(exchangeName,
                           routingKey,
                           true,
                           msgproperties.BASIC,
                           messageBodyBytes);

如果您需要更多详细信息,请告诉我。

最佳答案

属性已自动映射 - 请参阅 header mapper .

只需使用 <header-enricher/>设置适当的 header ,它将映射到正确的属性。在优先级的情况下,常量为 here有关 amqp 特定的 header 常量,请参阅 here .

关于rabbitmq - 在 RabbitMQ 中发布消息时发送 MessageProperties [priority=anyInteger],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29773421/

相关文章:

rabbitmq - 具有 2 个队列的 Spring AMQP 项目

python - celery - 完成任务时调用函数

java - 读取 TCP 回复消息时出现 MessageTimeoutException

java - Spring Boot集成url分页解决方案

java - Spring AMQP 多用户

java - Spring AMQP - 将消息返回到队列的开头

java - 使用 amqp 从队列中多路分解消息以在并行流中处理?

Ubuntu :Cannot acess RabbitMq Web management console 上的 RabbitMq

spring-boot - XMPP Spring 集成 - 聊天

java - Springboot RabbitMQ 与并发消费者