java - ActiveMQ 死信队列未创建

标签 java spring activemq

我正在尝试使用配置和测试 ActiveMQ 死信队列,但遇到一些问题。正如官方文档所说:

When messages expire on the Active MQ broker (they exceed their time-to-live, if set) or can’t be redelivered, they’re moved to a dead-letter queue, so they can be con- sumed or browsed by an administrator at a later point. Messages are normally redelivered to a client in the following scenarios:

  1. A client is using transactions and calls rollback() on the session.
  2. A client is using transactions and closes before calling commit.
  3. A client is using CLIENT_ACKNOWLEDGE on a session and calls recover() on that session.

因此,我创建了 Producer 类来创建事务 session 并在发送消息后回滚事务。正如我所料,消息没有出现在目标队列中,但问题是 DLQ 根本没有创建。如何管理死信队列?我无法使用 JConsole 找到它 - 只有目标队列,但没有 DLQ。 enter image description here

你知道问题出在哪里吗?有Producer.java和Spring配置!

<bean id="producer" class="com.jmsexamples.Producer">
    <property name="connectionFactory" ref="jmsFactory" />
    <property name="destination" ref="queue" />
    <property name="login" value="roman" />
    <property name="password" value="sawawluha" />
</bean>

<bean id="consumer" class="com.jmsexamples.Consumer">
    <property name="connectionFactory" ref="jmsFactory" />
    <property name="destination" ref="queue" />
    <property name="login" value="roman" />
    <property name="password" value="sawawluha" />
</bean>

<bean id="jdbc_ds" class="org.apache.commons.dbcp.BasicDataSource"
    destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver" />
    <property name="url"
        value="jdbc:mysql://localhost:3306/ActiveMQ?relaxAutoCommit=true" />
    <property name="username" value="root" />
    <property name="password" value="" />
    <property name="poolPreparedStatements" value="true" />
</bean>

<amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost:6616" />

<amq:queue id="queue" physicalName="message.queue" />


<amq:broker id="msgBroker" persistent="true">
    <amq:persistenceAdapter>
        <amq:kahaDB directory="target/activemq-data/kahadb"
            journalMaxFileLength="16mb" />
    </amq:persistenceAdapter>

    <amq:plugins>
        <amq:loggingBrokerPlugin logAll="true" />
    </amq:plugins>


    <amq:destinationPolicy>
        <amq:policyMap>
            <amq:policyEntries>
                <amq:policyEntry queue=">">
                    <amq:deadLetterStrategy>
                        <amq:individualDeadLetterStrategy
                            queuePrefix="DLQ." useQueueForQueueMessages="true"
                            processNonPersistent="true" />
                    </amq:deadLetterStrategy>
                </amq:policyEntry>
            </amq:policyEntries>
        </amq:policyMap>
    </amq:destinationPolicy>

    <amq:transportConnectors>
        <amq:transportConnector name="openwire"
            uri="tcp://localhost:6616" />
    </amq:transportConnectors>
</amq:broker>

制作人:

public class Producer {

    private String login;
    private String password;

    private ConnectionFactory connectionFactory;

    private Destination destination;

    public void setConnectionFactory(ConnectionFactory conFact) {
        this.connectionFactory = conFact;
    }

    public void setDestination(Destination dest) {
        this.destination = dest;
    }

    public void setLogin(String login) {
        this.login = login;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public void sendMessage(int count) {

        Connection connection = null;
        Session session = null;

        try {
            System.out.println("Loggining as " + login);

            // it's not neccessary to set login and password in this example
            // but when you are running a normal message server, you should use
            // login and password for authentification
            connection = connectionFactory.createConnection(this.login,
                    this.password); // estabilishing connection
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // starting
                                                                                // session
            MessageProducer producer = session.createProducer(destination); // initializing
                                                                            // of
                                                                            // message
                                                                            // producer
            System.out.println("Client_acknowledge " + session.getTransacted());
            for (int i = 0; i < count; i++) {
                TextMessage message = session.createTextMessage();

                message.setText("This is text message!");
                producer.send(message);
                System.out.println("message sent #" + i);
                session.rollback();
            }

        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            try {
                session.commit();
                session.close();
                connection.close();
                System.out.println("successfull");

            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

感谢您的帮助!

最佳答案

从生产者端 - 如果 session 回滚,消息不会以 DLQ 结束。

DLQ 的目的是在不丢失任何数据的情况下为有问题的消息找到“出路”。生产者应该知道要发送什么。如果发送事务被回滚,则消息永远不会到达代理。事务消费者将确保“有毒消息”最终出现在 DLQ 上,这样它就不会阻止进一步传入的消息。

在正常情况下,生产者上的事务实际上并没有多大作用。如果您想在给定时刻生成一组消息,则可以在事务中生成具有“全有或全无”行为的消息。

关于java - ActiveMQ 死信队列未创建,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28612791/

相关文章:

Java ASCII 字符作为字符串输入

java - 使用 spring mongoTemplate 进行条件/最大 mongo 查询

Java FILE 输入输出和文件输出

java - 自定义 Controller 中的 Spring REST 响应不同

java - 交换消息标题格式

java - Java中如何比较两个相同对象的数据

java - Controller 处理程序方法支持的返回类型

java - Redis List,弹出不移除

java - Camel序列化消息内容列表

JMS 主题与队列