java - ActiveMQ 消费者级别超时

标签 java jms activemq producer-consumer

我试图在 Active MQ(版本 5.15.0)中创建消费者级别的超时。考虑一条消息被消费者选择但无法确认,所以在这种情况下我希望消费者超时,以便其他消费者可以选择相同的消息来收听代理。

我设置两个消费者监听器的生产者代码:

public class JmsMessageListenerAckExample {
  public static void main(String[] args) throws URISyntaxException, Exception {
    Connection connection = null;
    try {
      // Producer
      ConnectionFactory factory = createActiveMQConnectionFactory();
      connection = factory.createConnection();
      Session session = connection.createSession(false,
          Session.CLIENT_ACKNOWLEDGE);
      Queue queue = session.createQueue("customerQueue");
      String payload = "Important Task";
      Message msg = session.createTextMessage(payload);
      MessageProducer producer = session.createProducer(queue);

      System.out.println("Sending text '" + payload + "'");
      producer.send(msg);

      // Consumer
      MessageConsumer consumer1 = session.createConsumer(queue);
      consumer1.setMessageListener(
          new AckMessageListener(false, "consumer1"));
      Thread.sleep(1000);
      System.out.println("Creating new message listener to acknowledge");
      producer.send(msg);
      MessageConsumer consumer2 = session.createConsumer(queue);
      consumer2.setMessageListener(
          new AckMessageListener(true, "consumer2"));
      connection.start();

      Thread.sleep(3000);
      session.close();
    } finally {
      if (connection != null) {
        connection.close();
      }
    }
  }

  private static ActiveMQConnectionFactory createActiveMQConnectionFactory() {
    // Create a connection factory.
    final ActiveMQConnectionFactory connectionFactory =
        new ActiveMQConnectionFactory("tcp://localhost:61616");

    // Pass the username and password.
    connectionFactory.setUserName("user");
    connectionFactory.setPassword("user");
    return connectionFactory;
  }
}

这是我的消费者监听器:

public class AckMessageListener implements MessageListener {
  private boolean acknowledge;
  private String consumerName;

  public AckMessageListener(boolean acknowledge, String consumerName) {
    this.acknowledge = acknowledge;
    this.consumerName = consumerName;
  }

  public void onMessage(Message message) {
    boolean terminate = !acknowledge;
    try {

      System.out.println("ConsumerName="+consumerName+", Acknowledge="+acknowledge);
      if (acknowledge) {
        try {
          message.acknowledge();
        } catch (JMSException e1) {
          e1.printStackTrace();
        }
      }

      System.out.println(message);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      if (terminate) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

我想以 consumer1 收听消息但不确认的方式进行模拟,因此超时 我正在尝试释放线程,我期待我的 consumer2拿起它并确认消息,以便消息在代理中从“消息入队”状态移动到“消息出队”状态,但我的 consumer2 无法接收任何消息事件。

我做错了什么吗?如何使用 Active MQ 实现消费者级别的超时?

最佳答案

处理此问题的一种方法是使用事务 (http://activemq.apache.org/how-do-transactions-work.html)。您在成功时调用 commit(),在失败时调用 rollback(),或者如果 session 在调用 commit() 之前关闭,则将发生重新传送 (http://activemq.apache.org/message-redelivery-and-dlq-handling.html)。

关于java - ActiveMQ 消费者级别超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52618903/

相关文章:

java - 将现有的 JAR 转换为 OSGi 包

java - ActiveMQ接收消息的不同方式

java - 配置activemq的授权

java - 企业 Web 应用程序中的缓存

java - Android 中的真实手机号码验证

java - 如何迭代大型 ActiveMQ 队列?

java - JMS Receive 如何在内部工作?

jms - ActiveMQ - 通过命令行删除/清除所有队列

spring - apache camel 路由队列问题

java - 程序不访问扩展 JPanel 类的方法 paintComponent()