java - JMS 与 akka 和多线程

标签 java multithreading jms akka qpid

public class QueueListener implements MessageListener {

    public static final ExecutorService executor = Executors.newWorkStealingPool();

    public static boolean isActorinit=false;
    public static ActorSystem system=null;
    private ActorRef myActor=null;
    private String _queueName=null; 

    public QueueListener(String qName){
        this._queueName = qName;
        if(!isActorinit){
            system=ActorSystem.create("Controller");

            try {
            myActor=system.actorOf(Props.create(MessageExecutor.class.getConstructor(String.class).newInstance(_queueName).getClass()),"mysysActor");
            } catch (Exception e) {
                // TODO Auto-generated catch block
            }
            isActorinit=true;
        }
    }

    /* 
     * (non-Javadoc)
     * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
     */
    @Override
    public void onMessage(Message msg) {

//      processRequest(msg);
        executeRequest(msg);
    }

    /** This method will process the message fetch by the listener.
     *   
     * @param msg - javax.jms.Messages parameter get queue message
     */
    private void processRequest(Message msg){

        String requestData=null;
        try {

            if(msg instanceof TextMessage){
                TextMessage textMessage= (TextMessage) msg;
                requestData = textMessage.getText().toString();
            }else if(msg instanceof ObjectMessage){
                ObjectMessage objMsg = (ObjectMessage) msg; 
                requestData = objMsg.getObject().toString();
            }


            MessageProcessor msgProcessor = new MessageProcessor(_queueName, requestData);
            executor.submit(msgProcessor);
        } catch (JMSException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

    }

    private void executeRequest(Message msg){

        String requestData=null;
        try {

            if(msg instanceof TextMessage){
                TextMessage textMessage= (TextMessage) msg;
                requestData = textMessage.getText().toString();
            }else if(msg instanceof ObjectMessage){
                ObjectMessage objMsg = (ObjectMessage) msg; 
                requestData = objMsg.getObject().toString();
            }
//           MessageExecutor objMessageExecutor=new MessageExecutor(_queueName);
            myActor.tell(requestData, ActorRef.noSender()); 

        } catch (JMSException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

    }

}

当使用 ExecutorService 执行 ProcessRequst 方法时,此代码工作正常。然而,在 akka actor 系统实现方面面临以下问题。

Exception in thread "Thread-4" java.lang.NullPointerException
    at com.syn.jms.listener.QueueListener.executeRequest(QueueListener.java:102)
    at com.syn.jms.listener.QueueListener.onMessage(QueueListener.java:59)
    at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:942)
    at java.lang.Thread.run(Thread.java:745)

请注意,我使用 Apache qpid APi 来实现带有 activeMQ 的 AMQP 协议(protocol)。

我无法理解这个问题。

最佳答案

我找到了解决方案,这是由于 Actor ref 的 NPE,同时为每个进程队列使用唯一的 actorRef 处理多个输入,并且它没有初始化对象。我找到了这个解决方案。

public QueueListener(String actorId,String qName){
        this._queueName = qName;
         if(!isActorinit){
                system=ActorSystem.create(actorId);

                isActorinit=true;
            }

          myActor=system.actorOf( Props.create(MessageExecutor.class, qName),qName);
    }

但是,我感谢您为我提供解决方案的意见。谢谢

关于java - JMS 与 akka 和多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34547149/

相关文章:

java - Jsf2 View 参数和 viewscoped beans

c++ - 使线程按顺序重做打印功能

java - 从 Camel 连接的 Weblogic JMS URL

java - JBoss JMS 提供商

java - JMS 连接到没有任何特定库的队列

Java:Selenium 将文本发送到错误的字段

javassist:重命名类和字段类型

multithreading - 在后台线程中限制同时运行带有信号量的异步协程

java - 如何从 3 个整数组成 HashMap 的键?

java - 如何在基本 RPG 游戏中使用 Thread.sleep