java - Spring rabbitmq 如何在错误处理程序上手动确认,在抛出特定的必需异常时,它仅在 Acknowledge 设置为 AUTO 时确认

标签 java spring rabbitmq spring-amqp

我们在项目中使用 Spring rabbitmq。目前对于监听器,我们正在使用手动确认模式。在这种情况下,当发生异常时,我们无法向 rabbitMQ 发送确认。

我们的准则是: 包 com.highq.listener;

import java.io.IOException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.util.ErrorHandler;
import com.highq.workflow.helper.RuleEngine;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;

/**
 * The Class RabbitMQListenerConfig.
 */
@Configuration
@Slf4j
@EnableRabbit
public class RabbitMQListenerConfig
{

    /** The rule engine. */
    @Autowired
    RuleEngine ruleEngine;

    private RabbitAdmin currentQueueAdmin = null;

    public RabbitAdmin getRabbitAdmin()
    {
        return currentQueueAdmin;
    }

    /**
     * This method will listen message from rabbitmq.
     *
     * @param message the message
     * @param channel - Channel created for particular listener
     * @param queue - Specified queue to which this listener will listen message
     * @param deliveryTag - Delivery Tag to be used when sending acknowledgement
     * @throws IOException - Exception in case of sending acknowledgement
     */
    @RabbitListener(queues = "${queue.name}")
    public void listen(byte[] msg, Channel channel, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException
    {
        try
        {
            String message = new String(msg, "UTF-8");
            log.info(message);
            ruleEngine.evaluateRule(message);
        }
        catch (Exception e)
        {
            log.error(e.toString());
            log.error(e.getMessage(), e);
        }
        finally
        {
            channel.basicAck(deliveryTag, false);
        }

    }

    /**
     * Error handler.
     *
     * @return the error handler
     */
    @Bean
    public ErrorHandler errorHandler()
    {
        return new RabbitExceptionHandler(new RabbitFatalExceptionStrategy(), this.currentQueueAdmin);
    }

    /**
     * Bean will create from this with given name.
     *
     * @param name - Queue name- will be instance id
     * @return the queue
     */
    @Bean
    public Queue queue(@Value("${queue.name}") String name)
    {
        return new Queue(name);
    }

    /**
     * Rabbit listener container factory.
     *
     * @param connectionFactory the connection factory
     * @return the direct rabbit listener container factory
     */
    @Bean
    public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory)
    {
        DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
        factory.setPrefetchCount(1);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public RabbitListenerAroundAdvice rabbitListenerAroundAdvice()
    {
        return new RabbitListenerAroundAdvice();
    }

    /**
     * RabbitAdmin Instance will be created which is required to create new Queue.
     *
     * @param cf - Connection factory
     * @return the rabbit admin
     */
    @Bean
    public RabbitAdmin admin(ConnectionFactory cf)
    {
        this.currentQueueAdmin = new RabbitAdmin(cf);
        return this.currentQueueAdmin;
    }

}

@Slf4j 公共(public)类 RabbitExceptionHandler 扩展了 ConditionalRejectingErrorHandler { 私有(private) RabbitAdmin rabbitAdmin;

public RabbitExceptionHandler()
{
    super();
}

/**
 * Create a handler with the supplied {@link FatalExceptionStrategy} implementation.
 * 
 * @param exceptionStrategy The strategy implementation.
 */
public RabbitExceptionHandler(FatalExceptionStrategy exceptionStrategy, RabbitAdmin rabbitAdmin)
{
    super(exceptionStrategy);
    this.rabbitAdmin = rabbitAdmin;
}

@Override
public void handleError(Throwable t)
{
    try
    {
        super.handleError(t);
    }
    catch (Exception e)
    {
        if (e instanceof AmqpRejectAndDontRequeueException)
        {
            Throwable t1 = new Throwable("Some message", e.getCause());
            log.error(e.getMessage(), e);
            ImmediateAcknowledgeAmqpException newAmqp = new ImmediateAcknowledgeAmqpException("Some message", t1);
            throw newAmqp;
        }
    }
}

package com.highq.listener;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;

/**
 * The Class RabbitFatalExceptionStrategy.
 */
@Slf4j
public class RabbitFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy
{

    /*
     * (non-Javadoc)
     * @see org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.DefaultExceptionStrategy#isFatal(java.lang.Throwable)
     */
    @Override
    public boolean isFatal(Throwable t)
    {
        boolean finalResult = super.isFatal(t);
        if (t instanceof ListenerExecutionFailedException)
        {
            ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;

            lefe.getFailedMessage().getMessageProperties().getDeliveryTag());
            log.error("\n   Error occured while handling message with Queue named : "
                    + 
            lefe.getFailedMessage().getMessageProperties().getConsumerQueue() + "   Message not processed is FATAL or NOT : " + finalResult
                    + ";\n  Failed message: " + lefe.getFailedMessage());
        }
        if (!log.isWarnEnabled() || !finalResult)
        {
            log.error(t.getMessage(), t);
        }
        return true;
    }

}

在上面的例子中,我们没有在我们的异常处理程序中的任何地方获取 channel ,所以我们如何在异常发生时手动确认。

最佳答案

你不能;使用手动确认时,所有确认逻辑都需要在监听器中。

很少使用手动确认,因为容器通常会提供您使用 AUTO 所需的所有灵 active 。

需要使用 MANUAL ack 的用例是什么?

关于java - Spring rabbitmq 如何在错误处理程序上手动确认,在抛出特定的必需异常时,它仅在 Acknowledge 设置为 AUTO 时确认,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52474001/

相关文章:

java - 如何检查Grails应用程序的GORM版本

javascript - 如何通过Java、JavaScript上的方法每次从SQL Server检索数据

java - 失败 - 上下文路径/MyProject 中的应用程序无法启动(JndiException 和 NameNotFoundException)

java - 使用 Hibernate 配置 Spring。创建名称为 'usersDao' : Unsatisfied dependency expressed through field 'sessionFactory' 的 bean 时出错

spring - Spring boot 中的 Rabbitmq 并发消费者

java - Windows+Selenium+Chrome=未知错误 : cannot find Chrome binary

spring - 如何为 spring ldap 提供 keystore 和信任库

java - 如何实例化 java.security.Principal?所有子类均已弃用

c# - 在C#中的rabbit Mq中获取xDeath中队列消息的最大重试次数

java - Spring Rabbit Listener关闭处理程序或返回没有重新传递标志的消息