我试图在 Active MQ(版本 5.15.0)中创建消费者级别的超时。考虑一条消息被消费者选择但无法确认,所以在这种情况下我希望消费者超时,以便其他消费者可以选择相同的消息来收听代理。
我设置两个消费者监听器的生产者代码:
public class JmsMessageListenerAckExample {
public static void main(String[] args) throws URISyntaxException, Exception {
Connection connection = null;
try {
// Producer
ConnectionFactory factory = createActiveMQConnectionFactory();
connection = factory.createConnection();
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("customerQueue");
String payload = "Important Task";
Message msg = session.createTextMessage(payload);
MessageProducer producer = session.createProducer(queue);
System.out.println("Sending text '" + payload + "'");
producer.send(msg);
// Consumer
MessageConsumer consumer1 = session.createConsumer(queue);
consumer1.setMessageListener(
new AckMessageListener(false, "consumer1"));
Thread.sleep(1000);
System.out.println("Creating new message listener to acknowledge");
producer.send(msg);
MessageConsumer consumer2 = session.createConsumer(queue);
consumer2.setMessageListener(
new AckMessageListener(true, "consumer2"));
connection.start();
Thread.sleep(3000);
session.close();
} finally {
if (connection != null) {
connection.close();
}
}
}
private static ActiveMQConnectionFactory createActiveMQConnectionFactory() {
// Create a connection factory.
final ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:61616");
// Pass the username and password.
connectionFactory.setUserName("user");
connectionFactory.setPassword("user");
return connectionFactory;
}
}
这是我的消费者监听器:
public class AckMessageListener implements MessageListener {
private boolean acknowledge;
private String consumerName;
public AckMessageListener(boolean acknowledge, String consumerName) {
this.acknowledge = acknowledge;
this.consumerName = consumerName;
}
public void onMessage(Message message) {
boolean terminate = !acknowledge;
try {
System.out.println("ConsumerName="+consumerName+", Acknowledge="+acknowledge);
if (acknowledge) {
try {
message.acknowledge();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
System.out.println(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (terminate) {
Thread.currentThread().interrupt();
}
}
}
}
我想以 consumer1
收听消息但不确认的方式进行模拟,因此超时 我正在尝试释放线程,我期待我的 consumer2
拿起它并确认消息,以便消息在代理中从“消息入队”状态移动到“消息出队”状态,但我的 consumer2
无法接收任何消息事件。
我做错了什么吗?如何使用 Active MQ 实现消费者级别的超时?
最佳答案
处理此问题的一种方法是使用事务 (http://activemq.apache.org/how-do-transactions-work.html)。您在成功时调用 commit(),在失败时调用 rollback(),或者如果 session 在调用 commit() 之前关闭,则将发生重新传送 (http://activemq.apache.org/message-redelivery-and-dlq-handling.html)。
关于java - ActiveMQ 消费者级别超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52618903/