java - 大黄蜂Q : Core API Producer and Asynchronous Consumer problems

标签 java jakarta-ee jms hornetq

您好,我是 HornetQ 的新手。我已经编写了一个测试包,它有一个带有生产者和异步消费者的嵌入式 HornetQ 服务器。希望我已经正确实现了。现在问题如下……

  1. 当我通过 prodcuer 向队列发送消息时,它返回成功,但是当尝试使用队列中的消息时,似乎没有消息被使用。就好像消费者是不活跃的。

  2. 当我尝试使用 setSendAcknowledgementHandler 方法时出现错误:

    java.lang.IllegalStateException: You can't set confirmationHandler on a connection with confirmation-window-size < 0. Look at the documentation for more information.
        at org.hornetq.core.protocol.core.impl.ChannelImpl.setCommandConfirmationHandler(ChannelImpl.java:330)
        at org.hornetq.core.client.impl.ClientSessionImpl.setSendAcknowledgementHandler(ClientSessionImpl.java:943)
        at org.hornetq.core.client.impl.DelegatingSession.setSendAcknowledgementHandler(DelegatingSession.java:493)
        at componentsII.QueueProducer.SendMessage(QueueProducer.java:9)
        at componentsII.StartClients.main(StartClients.java:11)
    

检查下面的类。

连接和 session

public class QueueConnection {

    private static org.hornetq.api.core.TransportConfiguration transport;
    private static org.hornetq.api.core.client.ClientSessionFactory sharedfactory;
    public static org.hornetq.api.core.client.ClientSession sharedSession;
    private static java.util.HashMap<String,Object> maps;
    private static boolean started= false;
    
    
    public static void setMaps(String []key, Object[] value){
        maps= new java.util.HashMap<String, Object>() ;
        
        if(key.length!=value.length){
            maps=null;
        }else{
            for(int x=0;x<value.length;x++){
                maps.put(key[x], value[x]);
            }
        }
        
    }
    
    private static org.hornetq.api.core.client.ClientSession session(){
        try{
        if(sharedSession==null){
            sharedSession=factory().createSession(true,true,0);
        }
        }catch(org.hornetq.api.core.HornetQException e){
            e.printStackTrace();
        }catch(Exception e){
            e.printStackTrace();
        }
        return sharedSession;
    }
    
    public static synchronized int size(String queueName){
        int count = 0;
        try {
            org.hornetq.api.core.client.ClientSession.QueueQuery result;
            result = sharedSession.queueQuery(new org.hornetq.api.core.SimpleString(queueName));
            count = (int) result.getMessageCount();
        } catch (org.hornetq.api.core.HornetQException e) {
            e.printStackTrace();
        } 
        return count;
    }
    public static void startSession(){
        try{
            
            if (session() != null || started!=true){
                session().start();
                started=true;
                System.out.println("Client Session started");
            }else{
                System.out.println("Client Session already started"); 
            }
        }catch(org.hornetq.api.core.HornetQException e){
            e.printStackTrace();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    public static void stopSession(){
        try{
            if (session() != null ){
            session().stop();
            factory().close();
            sharedSession=null;
            sharedfactory=null;
            System.out.println("Client Session stopped");
            }
        }catch(org.hornetq.api.core.HornetQException e){
            e.printStackTrace();
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    private static org.hornetq.api.core.TransportConfiguration TransportConfigs(){
        transport=new org.hornetq.api.core.TransportConfiguration(org.hornetq.core.remoting.impl.netty.NettyConnectorFactory.class.getName(),maps);
        return transport;
    }
    
    private static org.hornetq.api.core.client.ClientSessionFactory factory(){
        try{
            if(sharedfactory==null){
                org.hornetq.api.core.client.ServerLocator locator=org.hornetq.api.core.client.HornetQClient.createServerLocator(true,TransportConfigs());
                locator.setAckBatchSize(0);
                locator.setReconnectAttempts(3);
                locator.setConfirmationWindowSize(2);
                sharedfactory=locator.createSessionFactory();
            }
        }catch(Exception e){
            e.printStackTrace();
        }
        return sharedfactory;
    }
}

消费者

public class QueueConsumer {

    
    public static void Recieve(String queuename){
        try {
          org.hornetq.api.core.client.ClientConsumer consumer = QueueConnection.sharedSession.createConsumer(queuename);
              consumer.setMessageHandler(new msghandler());
        } catch (org.hornetq.api.core.HornetQException e) {
            e.printStackTrace();
        } catch(Exception e){
            e.printStackTrace();
        }
    }
    
    private static class msghandler implements org.hornetq.api.core.client.MessageHandler {
        @Override
        public void onMessage(org.hornetq.api.core.client.ClientMessage msg) {
            System.out.println("Message consumed ~"+msg.getStringProperty("myMsg"));
            
        }
    }
}

制作人

public class QueueProducer {

    public static void SendMessage(String queuename,String msg){
        try{
            QueueConnection.sharedSession.setSendAcknowledgementHandler(new acknowledegeHandler());
            org.hornetq.api.core.client.ClientProducer producer= QueueConnection.sharedSession.createProducer(queuename);
            org.hornetq.api.core.client.ClientMessage message = QueueConnection.sharedSession.createMessage(org.hornetq.api.core.client.ClientMessage.TEXT_TYPE,true);
            message.putStringProperty("myMsg", msg);
            producer.send(queuename,message);
            System.out.println("Message Sent to "+queuename);
        
        }catch(org.hornetq.api.core.HornetQException e){
            e.printStackTrace();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    private static class acknowledegeHandler implements org.hornetq.api.core.client.SendAcknowledgementHandler{
        @Override
        public void sendAcknowledged( org.hornetq.api.core.Message msg) {
            System.out.println("Received acknowledgement for message ~: "+msg.getStringProperty("myMsg"));
        }
    }
}

初始化客户端

public class StartClients {

    private static void initialize(){
        String keys[]={org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME,org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME};
        Object values[]={"localhost",5664};
        
        QueueConnection.setMaps(keys,values);
        QueueConnection.startSession();
    } 
    public static void main(String []args){
        
        new Thread(){@Override
            public void run(){
                System.out.println("Producer Thread");
                StartClients.initialize();
                String QueueName= "queues.Queue1";
                
                for(int a=1;a<10;a++){
                    QueueProducer.SendMessage(QueueName, "Message "+a+" to the embedded HornetQ Server");
                }
                System.out.println("Queue "+QueueName+" has "+QueueConnection.size(QueueName)+" messages on send");
                QueueConnection.stopSession();
            }}.start();
        
        new Thread(){@Override
            public void run(){
                try{
                    sleep(5000);
                    System.out.println();
                    System.out.println("Consumer Thread");
                    StartClients.initialize();
                    String QueueName= "queues.Queue1";
                    QueueConsumer.Recieve(QueueName);
                    System.out.println("Queue "+QueueName+" has "+QueueConnection.size(QueueName)+" messages after consumption");
                    QueueConnection.stopSession();
                }catch(java.lang.InterruptedException e){
                    e.printStackTrace();
                }catch(Exception e){
                    e.printStackTrace();
                }
            }}.start();
            
    }
}

嵌入式服务器

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.embedded.EmbeddedHornetQ;

public class QueueServer {
    public static void StartServer(){

        try {

             //Connection configurations
            Map<String, Object> params = new HashMap<String, Object>();
            params.put(TransportConstants.HOST_PROP_NAME, "localhost");
            params.put(TransportConstants.PORT_PROP_NAME, 5664);
            params.put(TransportConstants.USE_NIO_PROP_NAME, true);
            params.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
            params.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);
            
            Map<String, Object> params2 = new HashMap<String, Object>();
            params2.put(TransportConstants.HOST_PROP_NAME, "localhost");
            params2.put(TransportConstants.PORT_PROP_NAME, 5665);
            params2.put(TransportConstants.USE_NIO_PROP_NAME, true);
            params2.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
            params2.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);
            
            Map<String, Object> params3 = new HashMap<String, Object>();
            params3.put(TransportConstants.HOST_PROP_NAME, "localhost");
            params3.put(TransportConstants.PORT_PROP_NAME, 5666);
            params3.put(TransportConstants.USE_NIO_PROP_NAME, true);
            params3.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
            params3.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);
            
            Map<String, Object> params4 = new HashMap<String, Object>();
            params4.put(TransportConstants.HOST_PROP_NAME, "localhost");
            params4.put(TransportConstants.PORT_PROP_NAME, 5667);
            params4.put(TransportConstants.USE_NIO_PROP_NAME, true);
            params4.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
            params4.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME,3);
            
            //Server configurations
            Configuration config= new ConfigurationImpl();
            HashSet<TransportConfiguration>transports= new HashSet <TransportConfiguration>();
            transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params));
            transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params2));
            transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params3));
            transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(),params4));
            
            
            //Queues Configurations
            List<CoreQueueConfiguration> queueConfigs = new ArrayList<CoreQueueConfiguration>();
            String queueName="queues.Queue";
            for(int x=1;x<5;x++){
                queueConfigs.add(new CoreQueueConfiguration(queueName.concat(String.valueOf(x)),queueName.concat(String.valueOf(x)), null, true));
            }
         
            //Set Configurations
            config.setAcceptorConfigurations(transports);
            config.setQueueConfigurations(queueConfigs);
            config.setJournalType(JournalType.NIO);
            config.setSecurityEnabled(false);
                
            //Starting server
            EmbeddedHornetQ embedded = new EmbeddedHornetQ();
            embedded.setConfiguration(config);
            embedded.start();
            Thread.sleep(6000000);
            embedded.stop();
                
        } catch (Exception ex) {
            Logger.getLogger(QueueServer.class.getName()).log(Level.SEVERE, null, ex);
        }
        
    }
    public static void main(String [] args){
        StartServer();    
    }
}

我哪里出错了?

最佳答案

需要在ClientSessionFactory中定义确认窗口大小相关的结算方法。

你可以在这里查看文档

http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/client-reconnection.html

如果还要检查

HornetQ Messaging developers' guide

关于java - 大黄蜂Q : Core API Producer and Asynchronous Consumer problems,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14094101/

相关文章:

Java EE 7 updatetool 安装失败

java - 如何在共享首选项中保存搜索栏拇指位置

Java Lambda 表达式 - 必须转换参数?

java - 让 log4j 以 UTC 时间登录

java - 如何使用Java从IE中的服务器下载xml文件?

java - Maven 依赖引用旧的 Poi 版本

java - JMS。在 WildFly 12 中向 ActiveMQ 中的主题发送简单的主题消息失败

Spring JMS 监听器 - 如何根据条件读取消息

java - NoSuchProviderException : smtp with log4j SMTP appender

java - 如何追溯这个编译错误?