java - 消息未使用 Jboss 主题重新传送

标签 java jboss jms

我正在使用Jboss 4.0.2 GA

我正在使用 jbossmq-destinations-service 中定义的 testTopic

<mbean code="org.jboss.mq.server.jmx.Topic"
     name="jboss.mq.destination:service=Topic,name=testTopic">
    <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
    <depends optional-attribute-name="SecurityManager">jboss.mq:service=SecurityManager</depends>
    <attribute name="SecurityConf">
      <security>
        <role name="guest" read="true" write="true"/>
        <role name="publisher" read="true" write="true" create="false"/>
        <role name="durpublisher" read="true" write="true" create="true"/>
      </security>
    </attribute>
    <attribute name="RedeliveryDelay">0</attribute>
  </mbean>

我为发布者和订阅者编写了相同的文件::

 package com.nagarro.client;

import javax.jms.*;
import javax.naming.*;

import java.io.*;
import java.util.Properties;

public class Chat implements javax.jms.MessageListener {
    private TopicSession pubSession;
    private TopicSession subSession;
    private TopicPublisher publisher;
    private TopicConnection connection;
    private String username;

    /* Constructor. Establish JMS publisher and subscriber */
    public Chat(String topicName, String username, String password)
            throws Exception {
        // Obtain a JNDI connection
        Properties properties = new Properties();
        properties.put("java.naming.factory.initial",
                "org.jnp.interfaces.NamingContextFactory");
        properties.put("java.naming.factory.url.pkgs",
                "org.jboss.naming:org.jnp.interfaces");
        properties.setProperty(Context.PROVIDER_URL, "localhost:1099");
        // ... specify the JNDI properties specific to the vendor

        InitialContext jndi = new InitialContext(properties);

        // Look up a JMS connection factory
        TopicConnectionFactory conFactory = (TopicConnectionFactory) jndi
                .lookup("TopicConnectionFactory");

        // Create a JMS connection
        TopicConnection connection = conFactory.createTopicConnection();

        // Create two JMS session objects
        TopicSession pubSession = connection.createTopicSession(false,
                Session.CLIENT_ACKNOWLEDGE);
        TopicSession subSession = connection.createTopicSession(false,
                Session.CLIENT_ACKNOWLEDGE);

        // Look up a JMS topic
        Topic chatTopic = (Topic) jndi.lookup(topicName);

        // Create a JMS publisher and subscriber
        TopicPublisher publisher = pubSession.createPublisher(chatTopic);
        TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);

        // Set a JMS message listener
        subscriber.setMessageListener(this);

        // Intialize the Chat application
        set(connection, pubSession, subSession, publisher, username);

        // Start the JMS connection; allows messages to be delivered
        connection.start();

    }

    /* Initialize the instance variables */
    public void set(TopicConnection con, TopicSession pubSess,
            TopicSession subSess, TopicPublisher pub, String username) {
        this.connection = con;
        this.pubSession = pubSess;
        this.subSession = subSess;
        this.publisher = pub;
        this.username = username;
    }

    /* Receive message from topic subscriber */
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = (TextMessage) message;
            String text = textMessage.getText();
            System.out.println(text);
            if(textMessage!=null){
                throw new NullPointerException();
            }
        } catch (JMSException jmse) {
            jmse.printStackTrace();
        }
    }

    /* Create and send message using topic publisher */
    protected void writeMessage(String text) throws JMSException {
        TextMessage message = pubSession.createTextMessage();
        message.setText(username + " : " + text);
        publisher.publish(message);
    }

    /* Close the JMS connection */
    public void close() throws JMSException {
        connection.close();
    }

    /* Run the Chat client */
    public static void main(String[] args) {
        try {
            if (args.length != 3)
                System.out.println("Topic or username missing");

            // args[0]=topicName; args[1]=username; args[2]=password
            Chat chat = new Chat("topic/testTopic", "", "");

            // Read from command line
            BufferedReader commandLine = new java.io.BufferedReader(
                    new InputStreamReader(System.in));

            // Loop until the word "exit" is typed
            while (true) {
                String s = commandLine.readLine();
                if (s.equalsIgnoreCase("exit")) {
                    chat.close(); // close down connection
                    System.exit(0);// exit program
                } else
                    chat.writeMessage(s);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在上面的代码中我使用 Client_Acknowledge session 模式,但即使我抛出NullPointerException,我也没有收到消息的重新传递来自onMessage Listener 方法。

如果重新交付需要进行任何配置更改,请告诉我。

最佳答案

对于主题,如果您在读取消息后调用 topicSession.recover(),消息将立即重新传递(例如,在 try/catch block 中,如果发生异常)。

使用 JBoss 7、非持久主题、Session.CLIENT_ACKNOWLEDGEsetMessageListener 进行的测试:

                topicSubscriber.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        try {
                            throw new IllegalStateException("Test");
                        } catch (Exception ex) {
                            try {
                                topicSession.recover();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }

                            ex.printStackTrace();
                        }
                    }
                });

导致立即重新传递 10 次,然后将消息放入“死信队列”。

关于java - 消息未使用 Jboss 主题重新传送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18490181/

相关文章:

java - 谁是最好的应用服务器: JBoss or Glassfish?

java - 是否可以通过某种方式从部署在 BEA AquaLogic 服务总线上的业务服务调用部署在 JBoss 上的 EJB?

java - 将消息发布到远程 JMS 提供者

spring - 使用 JMS 调用远程服务

java - 使用 JMS 监听器自动刷新 Wicket 组件

java - Java 中三角函数优化性能提升

java - 使用 if(0 == foo()) 而不是 (foo() == 0) 有什么好处?

java - 我可以将 hibernate 实体注释与 bean 验证一起使用吗

java - Struts2 xml 验证不起作用

Java : ExecutorService : newFixedThreadPool