我有一个生产者/消费者客户端,其主题是使用 Stomp 协议(protocol)的 Active MQ。 我使用 Gozirra。
问题不在于对话另一端产生的所有消息 如下编码到达消费者客户端。 一些消息成功到达,但其他消息无法到达。
我听说消息丢失是由于实现的消息传递的异步性质 基于消费者/生产者模型。
有人说使用收据机制可能会有所帮助。
您认为问题是什么?
============================================= ==========================
代码如下
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.security.auth.login.LoginException;
import com.lnisoft.ontocept.StringEscapeUtils;
import net.ser1.stomp.Client;
import net.ser1.stomp.Listener;
import android.os.AsyncTask;
// * Active MQ Stomp client using Gozirra
public class Communicator implements Listener
{
private static Communicator instance;
private Client stomp_client = null;
private ConnectToOntoceptAsync connector = null;
private String emailAsUserIdentifier = "";
private String topicName = null;
private String messageToSend = "NO_MESSAGE";
private boolean isLoggedIn = false;
private StreamConverter converter = null;
private String errorMessage = "";
private boolean hasError = false;
public HashMap<String, MessageStack> perCommunicatorUserMessageStackTbl = new HashMap<String, MessageStack>();
public Communicator()
{
converter = new StreamConverter();
}
public static Communicator getInstance()
{
if ( instance == null )
{
instance = new Communicator();
}
return instance;
}
@Override
public void message( Map headers, String body )
{
String ascii_to_unicode = StringEscapeUtils.unescapeJava( body );
if ( ((String)headers.get("sender")).contains("ontocept") )
{
storeMessageFromOntocept( ascii_to_unicode );
}
else
{
System.out.println("my message : "+ ascii_to_unicode);
System.out.println("\n");
}
}
public boolean getIsLoggedIn()
{
return this.isLoggedIn;
}
public void initializeCommunicator( String _emailAsUserIdentifier ) throws LoginException, IOException
{
this.emailAsUserIdentifier = _emailAsUserIdentifier;
topicName = "/topic/"+ this.emailAsUserIdentifier;
connector = new ConnectToOntoceptAsync();
connector.execute();
}
public boolean isIntialized()
{
if ( isLoggedIn == false )
{
return false;
}
else
{
return true;
}
}
protected class ConnectToOntoceptAsync extends AsyncTask<String, Void, String>
{
@Override
protected String doInBackground( String... params )
{
try
{
stomp_client = new Client( "***.***.**.***", 61613, emailAsUserIdentifier, "1234" );
stomp_client.subscribe( topicName, Communicator.this );
isLoggedIn = true;
}
catch (LoginException e)
{
e.printStackTrace();
}
catch (IOException e)
{
e.printStackTrace();
}
return "Executed";
}
@Override
protected void onPostExecute(String text)
{
// TODO
}
}
public boolean hasError()
{
return this.hasError;
}
public String getError()
{
return this.errorMessage;
}
public void disconnect()
{
stomp_client.unsubscribe( this.topicName );
stomp_client.disconnect();
this.isLoggedIn = false;
}
public void sendMessageToOntocept( String _messageText )
{
this.messageToSend = _messageText;
new SendAsyncMessageToOntocept().execute();
}
private class SendAsyncMessageToOntocept extends AsyncTask<String, Void, String>
{
@Override
protected String doInBackground( String... params )
{
String unicode_formatted_message = converter.convertToUnicodeText( messageToSend );
Map<String, String> header = new HashMap<String,String>();
// * http://www.germane-software.com/software/Java/Gozirra/
header.put( "type", "text/plain" );
header.put( "sender", "user" );
stomp_client.send( topicName, unicode_formatted_message, header );
try
{
Thread.sleep(2000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
Thread.yield();
return "Executed";
}
@Override
protected void onPostExecute(String text)
{ }
}
public boolean hasNewMessage( String _messageRecipientIdentifier )
{
if ( _messageRecipientIdentifier.contains("quiz_activity") )
{
if ( perCommunicatorUserMessageStackTbl.get("quiz_activity") == null )
{
return false;
}
else
{
return perCommunicatorUserMessageStackTbl.get("quiz_activity").existNewMessage();
}
}
else if ( _messageRecipientIdentifier.contains("vms_activity") ) // VisualizeMemorySpace 액티비티 약자
{
if ( perCommunicatorUserMessageStackTbl.get("vms_activity") == null )
{
return false;
}
else
{
return perCommunicatorUserMessageStackTbl.get("vms_activity").existNewMessage();
}
}
else // _messageRecipientIdentifier = Ontocept_Activity
{
if ( perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
{
return false;
}
else
{
return perCommunicatorUserMessageStackTbl.get("ontocept_activity").existNewMessage();
}
}
}
public Message getNewMessage( String _messageRecipientIdentifier )
{
if ( _messageRecipientIdentifier.contains("quiz_activity") )
{
if ( perCommunicatorUserMessageStackTbl.get("quiz_activity") == null )
{
return null;
}
else
{
return perCommunicatorUserMessageStackTbl.get("quiz_activity").getNewMessage();
}
}
else if ( _messageRecipientIdentifier.contains("vms_activity") )
{
if ( perCommunicatorUserMessageStackTbl.get("vms_activity") == null )
{
return null;
}
else
{
return perCommunicatorUserMessageStackTbl.get("vms_activity").getNewMessage();
}
}
else // _messageRecipientIdentifier = Ontocept_Activity
{
if ( perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
{
return null;
}
else
{
return perCommunicatorUserMessageStackTbl.get("ontocept_activity").getNewMessage();
}
}
}
public String convertToUnicodeText( String str )
{
StringBuffer ostr = new StringBuffer();
for ( int i=0; i<str.length(); i++)
{
char ch = str.charAt(i);
if ((ch >= 0x0020) && (ch <= 0x007e)) // Does the char need to be converted to unicode?
{
ostr.append(ch); // No.
}
else // Yes.
{
ostr.append("\\u") ; // standard unicode format.
String hex = Integer.toHexString(str.charAt(i) & 0xFFFF); // Get hex value of the char.
for(int j=0; j<4-hex.length(); j++) // Prepend zeros because unicode requires 4 digits
ostr.append("0");
ostr.append(hex.toLowerCase()); // standard unicode format.
//ostr.append(hex.toLowerCase(Locale.ENGLISH));
}
}
return (new String(ostr)); //Return the stringbuffer cast as a string.
}
private void storeMessageFromOntocept( String messageObjectString )
{
Message messageObject = MessageObjectGenerator.getInstance().parseMessageStringIntoMessageObject( messageObjectString );
if ( messageObject.getRecipient().contains("ontocept_activity") )
{
if ( this.perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
{
MessageStack ms = new MessageStack();
ms.addMessage( messageObject );
this.perCommunicatorUserMessageStackTbl.put( "ontocept_activity", ms );
}
else
{
this.perCommunicatorUserMessageStackTbl.get("ontocept_activity").addMessage( messageObject );
}
}
else if ( messageObject.getRecipient().contains("vms_activity") )
{
if ( this.perCommunicatorUserMessageStackTbl.get("vms_activity") == null )
{
MessageStack ms = new MessageStack();
ms.addMessage( messageObject );
this.perCommunicatorUserMessageStackTbl.put( "vms_activity", ms );
}
else
{
this.perCommunicatorUserMessageStackTbl.get("vms_activity").addMessage( messageObject );
}
}
else
{
if ( this.perCommunicatorUserMessageStackTbl.get("ontocept_activity") == null )
{
MessageStack ms = new MessageStack();
ms.addMessage( messageObject );
this.perCommunicatorUserMessageStackTbl.put( "ontocept_activity", ms );
}
else
{
this.perCommunicatorUserMessageStackTbl.get("ontocept_activity").addMessage( messageObject );
}
}
}
public HashMap<String, MessageStack> getMessageStack()
{
return this.perCommunicatorUserMessageStackTbl;
}
}
最佳答案
这似乎是一个 Android 应用程序。因此要求应用始终处于 Activity 状态,否则与服务器的通信将中断,导致消息丢失。
您确定将从后台线程中调用 Listener 吗?要连接到服务器,您的 AsyncTask ConnectToOntoceptAsync 会运行并立即返回。在 Android 上,所有网络通信都必须在单独的线程上进行。在我看来,这看起来很可疑。
我会在后台线程上进行所有通信,因此 Stomp 客户端是 AsyncTask 的私有(private)字段。此外,我更喜欢使用阻塞接收方法,而不是使用 Listener 接口(interface),它提供更详细的控制。
关于java - Activemq Topic Consumer 消费某些消息失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26446285/