java - Activemq Topic Consumer 消费某些消息失败

标签 java android activemq stomp

我有一个生产者/消费者客户端,其主题是使用 Stomp 协议(protocol)的 Active MQ。 我使用 Gozirra。

问题不在于对话另一端产生的所有消息 如下编码到达消费者客户端。 一些消息成功到达,但其他消息无法到达。

我听说消息丢失是由于实现的消息传递的异步性质 基于消费者/生产者模型。

有人说使用收据机制可能会有所帮助。

您认为问题是什么?

============================================= ==========================

代码如下

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.security.auth.login.LoginException;
import com.lnisoft.ontocept.StringEscapeUtils;
import net.ser1.stomp.Client;
import net.ser1.stomp.Listener;
import android.os.AsyncTask;


// * Active MQ Stomp client using Gozirra

public class Communicator implements Listener 
{
private static Communicator instance;
private Client stomp_client = null;
private ConnectToOntoceptAsync connector = null;

private String emailAsUserIdentifier = "";
private String topicName = null;
private String messageToSend = "NO_MESSAGE";

private boolean isLoggedIn = false;
private StreamConverter converter = null;
private String errorMessage = "";
private boolean hasError = false;
public HashMap<String, MessageStack> perCommunicatorUserMessageStackTbl = new HashMap<String, MessageStack>();

public Communicator()
{
     converter = new StreamConverter();
}

public static Communicator getInstance()
{
     if ( instance == null )
     {
          instance = new Communicator();
     }

     return instance;
}

@Override 
public void message( Map headers, String body ) 
{ 
      String ascii_to_unicode = StringEscapeUtils.unescapeJava( body ); 

      if ( ((String)headers.get("sender")).contains("ontocept") )
      {
          storeMessageFromOntocept( ascii_to_unicode );
      }
      else
      {
          System.out.println("my message : "+ ascii_to_unicode);               
          System.out.println("\n"); 
      }
} 

public boolean getIsLoggedIn()
{
     return this.isLoggedIn;
}

public void initializeCommunicator( String _emailAsUserIdentifier ) throws LoginException, IOException
{       
    this.emailAsUserIdentifier = _emailAsUserIdentifier;

    topicName = "/topic/"+ this.emailAsUserIdentifier;

    connector = new ConnectToOntoceptAsync();
    connector.execute();
}

public boolean isIntialized() 
{
     if ( isLoggedIn == false )
     {
           return false;
     }
     else
     {
          return true;
     }
}

protected class ConnectToOntoceptAsync extends AsyncTask<String, Void, String> 
{       
    @Override
    protected String doInBackground( String... params )  
    {
        try 
        { 
             stomp_client = new Client( "***.***.**.***", 61613, emailAsUserIdentifier,  "1234" );  
             stomp_client.subscribe( topicName, Communicator.this  );
             isLoggedIn = true; 

        } 
        catch (LoginException e) 
        { 
            e.printStackTrace(); 
        } 
        catch (IOException e) 
        { 
            e.printStackTrace(); 
        } 

         return "Executed";
    }

    @Override
    protected void onPostExecute(String text) 
    {
          // TODO 
    }
}

public boolean hasError()
{
     return this.hasError;
}

public String getError()
{
     return this.errorMessage;
}

public void disconnect()
{
    stomp_client.unsubscribe( this.topicName );
    stomp_client.disconnect();
    this.isLoggedIn = false;
}

public void sendMessageToOntocept( String _messageText )
{
     this.messageToSend = _messageText;

     new SendAsyncMessageToOntocept().execute();
}

private class SendAsyncMessageToOntocept extends AsyncTask<String, Void, String>
{
    @Override
    protected String doInBackground( String... params )  
    { 
        String unicode_formatted_message = converter.convertToUnicodeText( messageToSend  ); 

        Map<String, String> header = new HashMap<String,String>();

        // * http://www.germane-software.com/software/Java/Gozirra/
        header.put( "type", "text/plain" );  
        header.put( "sender", "user" );

        stomp_client.send( topicName, unicode_formatted_message, header );

        try 
        { 
            Thread.sleep(2000); 
        } 
        catch (InterruptedException e) 
        { 
            e.printStackTrace(); 
        } 

        Thread.yield(); 

        return "Executed";
    }

    @Override
    protected void onPostExecute(String text) 
    { }
}

public boolean hasNewMessage( String _messageRecipientIdentifier )
{ 
      if ( _messageRecipientIdentifier.contains("quiz_activity") )
      {
            if ( perCommunicatorUserMessageStackTbl.get("quiz_activity") == null )
            {
                   return false;
            }
            else
            {
                 return perCommunicatorUserMessageStackTbl.get("quiz_activity").existNewMessage();
            }
      }
      else if ( _messageRecipientIdentifier.contains("vms_activity") )   // VisualizeMemorySpace 액티비티 약자
      {
            if ( perCommunicatorUserMessageStackTbl.get("vms_activity") == null )
            {
                    return false;
            }
            else
            {
                   return perCommunicatorUserMessageStackTbl.get("vms_activity").existNewMessage();
            }
      }
      else  // _messageRecipientIdentifier = Ontocept_Activity
      { 
            if ( perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
            {
                   return false;
            }
            else
            {
                   return perCommunicatorUserMessageStackTbl.get("ontocept_activity").existNewMessage();
            }
      }
}

public Message getNewMessage( String _messageRecipientIdentifier )
{
      if ( _messageRecipientIdentifier.contains("quiz_activity") )
      {
            if ( perCommunicatorUserMessageStackTbl.get("quiz_activity") == null )
            {
                  return null;
            }
            else
            {
                 return perCommunicatorUserMessageStackTbl.get("quiz_activity").getNewMessage();
            }
      }
      else if ( _messageRecipientIdentifier.contains("vms_activity") ) 
      {
            if ( perCommunicatorUserMessageStackTbl.get("vms_activity") == null )
            {
                  return null;
            }
            else
            {
                   return perCommunicatorUserMessageStackTbl.get("vms_activity").getNewMessage();
            }
      }
      else  // _messageRecipientIdentifier = Ontocept_Activity
      { 
            if ( perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
            {
                   return null;
            }
            else
            {
                   return perCommunicatorUserMessageStackTbl.get("ontocept_activity").getNewMessage();
            }
      }
}

public String convertToUnicodeText( String str )
{
     StringBuffer ostr = new StringBuffer();

     for ( int i=0; i<str.length(); i++) 
     {
            char ch = str.charAt(i);

            if ((ch >= 0x0020) && (ch <= 0x007e))   // Does the char need to be converted to unicode? 
            {
                    ostr.append(ch);                    // No.
            } 
            else                                    // Yes.
            {
                    ostr.append("\\u") ;                // standard unicode format.
                    String hex = Integer.toHexString(str.charAt(i) & 0xFFFF);   // Get hex value of the char. 
                    for(int j=0; j<4-hex.length(); j++) // Prepend zeros because unicode requires 4 digits
                        ostr.append("0");
                    ostr.append(hex.toLowerCase());     // standard unicode format.
                    //ostr.append(hex.toLowerCase(Locale.ENGLISH));
            }
        }

      return (new String(ostr));        //Return the stringbuffer cast as a string.
}

private void storeMessageFromOntocept( String messageObjectString ) 
{ 
      Message messageObject =   MessageObjectGenerator.getInstance().parseMessageStringIntoMessageObject( messageObjectString );

      if ( messageObject.getRecipient().contains("ontocept_activity") )
      {
            if ( this.perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
            {
                  MessageStack ms = new MessageStack();

                  ms.addMessage( messageObject );

                  this.perCommunicatorUserMessageStackTbl.put( "ontocept_activity", ms );
            }
            else
            {
                  this.perCommunicatorUserMessageStackTbl.get("ontocept_activity").addMessage( messageObject );
            }
      }
      else if ( messageObject.getRecipient().contains("vms_activity") )   
      {
            if ( this.perCommunicatorUserMessageStackTbl.get("vms_activity") == null )
            {
                  MessageStack ms = new MessageStack();

                  ms.addMessage( messageObject );

                  this.perCommunicatorUserMessageStackTbl.put( "vms_activity", ms );
            }
            else
            {
                  this.perCommunicatorUserMessageStackTbl.get("vms_activity").addMessage( messageObject );
            }
      }
      else 
      { 
            if ( this.perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
            {
                  MessageStack ms = new MessageStack();

                  ms.addMessage( messageObject );

                  this.perCommunicatorUserMessageStackTbl.put( "ontocept_activity", ms );
            }
            else
            {
                  this.perCommunicatorUserMessageStackTbl.get("ontocept_activity").addMessage( messageObject );
            }
      }
}

public HashMap<String, MessageStack> getMessageStack()
{
     return this.perCommunicatorUserMessageStackTbl;
}
}

最佳答案

这似乎是一个 Android 应用程序。因此要求应用始终处于 Activity 状态,否则与服务器的通信将中断,导致消息丢失。

您确定将从后台线程中调用 Listener 吗?要连接到服务器,您的 AsyncTask ConnectToOntoceptAsync 会运行并立即返回。在 Android 上,所有网络通信都必须在单独的线程上进行。在我看来,这看起来很可疑。

我会在后台线程上进行所有通信,因此 Stomp 客户端是 AsyncTask 的私有(private)字段。此外,我更喜欢使用阻塞接收方法,而不是使用 Listener 接口(interface),它提供更详细的控制。

关于java - Activemq Topic Consumer 消费某些消息失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26446285/

相关文章:

java - 带有根元素的 Jackson JSON 反序列化

java - 使用JNA将 native C函数映射到Java接口(interface)时出现指针问题

java - 为什么我的流复制过程性能稳定下降

Android Studio SuppressLint "ClickableViewAccessibility"不工作

java - 无法发布到 Linux 上 ActiveMQ 上的队列

java - 无需浏览器即可打开并监视运行 Flash 应用程序的窗口

javascript - 如何使用userAgent识别android和linux用户

android - 进程退出时重置 Android Widget

java.io.EOFException - ActiveMQ 消息代理

exception - 通过 JMX 访问 Apache ActiveMQ 抛出异常