spring - 如何使用 JmsTemplate 进行手动确认并从 Rabbitmq 队列中删除消息

标签 spring spring-boot rabbitmq spring-jms

我正在将 RabbitMq(with JMS) 与 jmsTemplate 一起使用,我能够从 RabbitMq 队列中使用消息,但它正在接受 AUTO 确认。

我有搜索 API,但无法找到它。

如何设置手动确认。

在下面的代码中,当消息从队列中被消耗时,我想使用该消息调用 Web 服务,并取决于来自我想从队列中删除该消息的响应。
我创建了一个项目,其中我使用 Listener 和其他项目,并调用从队列中读取消息

第一个项目:

package com.es.jms.listener;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.jms.listener.SimpleMessageListenerContainer;

import com.rabbitmq.jms.admin.RMQConnectionFactory;

@Configuration
public class RabbitMqMessageListener {

    @Bean
    public ConnectionFactory jmsConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername("Username");
        connectionFactory.setPassword("Password");
        connectionFactory.setVirtualHost("vhostname");
        connectionFactory.setHost("hostname");

        return connectionFactory;
    }

    @Bean
    public MessageListener msgListener() {
        return new MessageListener() {
            public void onMessage(Message message) {

                System.out.println(message.toString());
                if (message instanceof TextMessage) {
                    try {
                        String msg = ((TextMessage) message).getText();
                        System.out.println("Received message: " + msg);

                        // call web service here and depends on web service
                        // response
                        // if 200 then delete msg from queue else keep msg in
                        // queue

                    } catch (JMSException ex) {
                        throw new RuntimeException(ex);
                    }
                }

            }
        };
    }

    @Bean
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(jmsConnectionFactory());
        container.setDestinationName("test");

        container.setMessageListener(msgListener());
        return container;

    }
}

第二个项目:
package com.rabbitmq.jms.consumer.controller;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import javax.jms.ConnectionFactory;

import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;


import com.rabbitmq.jms.admin.RMQConnectionFactory;

import redis.clients.jedis.Jedis;

@Controller
public class ReceiverController {
    @Autowired
    JmsTemplate jmsTemplate;


    @Bean
    public ConnectionFactory jmsConnectionFactory() {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername("Username");
        connectionFactory.setPassword("Password");
        connectionFactory.setVirtualHost("vhostname");
        connectionFactory.setHost("hostname");

        return connectionFactory;
    }

    @CrossOrigin
    @SuppressWarnings({ "unchecked", "rawtypes" })
    @RequestMapping(method = RequestMethod.GET, value = "/getdata")
    @ResponseBody
    public ResponseEntity<String> fecthDataFromRedis()
            throws JSONException, InterruptedException, JmsException, ExecutionException, TimeoutException {
        System.out.println("in controller");

        jmsTemplate.setReceiveTimeout(500L);
        // jmsTemplate.
        String message = (String) jmsTemplate.receiveAndConvert("test");

                    // call web service here and depends on web service
                    // response
                    // if 200 then delete msg from queue else keep msg in
                    // queue
        System.out.println(message);

        }

        return new ResponseEntity(message , HttpStatus.OK);

    }

}

我怎样才能做到这一点?

提前致谢。

最佳答案

您没有使用 JmsTemplate ,您正在使用 SimpleMessageListenerContainer来接收消息。

如果您使用模板,则必须使用 execute带有 SessionCallback 的方法因为确认必须发生在接收消息的 session 范围内。

然而,随着 SimpleMessageListenerContainer ,您只需设置 sessionAcknowledgeModeSession.CLIENT_ACKNOWLEDGE .请参阅容器 javadocs...

/**
 * Message listener container that uses the plain JMS client API's
 * {@code MessageConsumer.setMessageListener()} method to
 * create concurrent MessageConsumers for the specified listeners.
 *
 * <p>This is the simplest form of a message listener container.
 * It creates a fixed number of JMS Sessions to invoke the listener,
 * not allowing for dynamic adaptation to runtime demands. Its main
 * advantage is its low level of complexity and the minimum requirements
 * on the JMS provider: Not even the ServerSessionPool facility is required.
 *
 * <p>See the {@link AbstractMessageListenerContainer} javadoc for details
 * on acknowledge modes and transaction options. Note that this container
 * exposes standard JMS behavior for the default "AUTO_ACKNOWLEDGE" mode:
 * that is, automatic message acknowledgment after listener execution,
 * with no redelivery in case of a user exception thrown but potential
 * redelivery in case of the JVM dying during listener execution.
 *
 * <p>For a different style of MessageListener handling, through looped
 * {@code MessageConsumer.receive()} calls that also allow for
 * transactional reception of messages (registering them with XA transactions),
 * see {@link DefaultMessageListenerContainer}.
   ...

编辑

使用 JmsTemplate 时,您必须在 session 范围内完成您的工作 - 以下是如何...

首先,您必须在模板中启用客户确认...
this.jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

然后,使用 execute带有 SessionCallback 的方法...
Boolean result = this.jmsTemplate.execute(session -> {
    MessageConsumer consumer = session.createConsumer(
            this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, "bar", false));
    String result = null;
    try {
        Message received = consumer.receive(5000);
        if (received != null) {
            result = (String) this.jmsTemplate.getMessageConverter().fromMessage(received);

            // Do some stuff here.

            received.acknowledge();
            return true;
        }
    }
    catch (Exception e) {
        return false;
    }
    finally {
        consumer.close();
    }
}, true);

关于spring - 如何使用 JmsTemplate 进行手动确认并从 Rabbitmq 队列中删除消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43305346/

相关文章:

java - Spring Boot中属性注入(inject)工作的充要条件是什么?

spring - 将 Spring Boot 从 1.3.7 升级到 1.4.0 导致 AuthenticatorBase.getJaspicProvider 中出现 NullPointerException

java - Spring、AngularJS 和 Maven 之间的联系

node.js - 如何将 React 应用程序部署到 Prod 以及如何管理版本控制

java - Spring Integration 和 RabbitMQ,如何避免对入站 channel 适配器进行轮询

python - 为什么在使用 Django 和 Celery 时导入任务时必须输入项目名称?

c# - 如何在 EasyNetQ 中为每个消费者声明自定义错误交换?

java - 如何在 Spring 中模拟 ModelMapper?

java - Spring Data Neo4j ClassFileProcessor 不加载任何类

java - 在 Spring @Value('#{SPEL}) 中使用 Java 类