jms - 如何从 Websphere MQ 读取大量消息

标签 jms ibm-mq messaging mq

我想按顺序分组从 Websphere MQ 读取 10000 条消息,我正在使用下面的代码来执行相同的操作,但是读取所有消息需要很长时间。即使我尝试使用多线程概念,但有时 2 个线程正在消耗相同的组和竞争条件发生。下面是代码片段。 我正在尝试使用 3 个线程依次从 MQ 读取 10000 条消息,但我的两个线程同时访问同一组。如何避免这种情况?按顺序读取大量消息的最佳方法是什么?我的要求是我想按顺序阅读 10000 条消息。请帮忙。

MQConnectionFactory factory = new MQConnectionFactory();
factory.setQueueManager("QM_host")
MQQueue destination = new MQQueue("default");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

MessageConsumer lastMessageConsumer = 
    session.createConsumer(destination, "JMS_IBM_Last_Msg_In_Group=TRUE");
TextMessage lastMessage = (TextMessage) lastMessageConsumer.receiveNoWait();
lastMessageConsumer.close();

if (lastMessage != null) {

    int groupSize = lastMessage.getIntProperty("JMSXGroupSeq");
    String groupId = lastMessage.getStringProperty("JMSXGroupID");

    boolean failed = false;

    for (int i = 1; (i < groupSize) && !failed; i++) {

        MessageConsumer consumer = session.createConsumer(destination,
            "JMSXGroupID='" + groupId + "'AND JMSXGroupSeq=" + i);
        TextMessage message = (TextMessage)consumer.receiveNoWait();

        if (message != null) {
            System.out.println(message.getText());
        } else {
            failed = true;
        }

        consumer.close();

    }

    if (failed) {
        session.rollback();
    } else {
        System.out.println(lastMessage.getText());
        session.commit();
    }

}

connection.close();

最佳答案

我认为更好的方法是在你的应用程序中有一个协调器线程,它会监听组的最后消息,并且每个组都会启动一个新线程来获取属于分配给该线程的组的消息。 (这将满足竞争条件。)

在获取属于一个组的消息的线程中,您不需要使用 for 循环分别获取每条消息,而是应该获取属于该组的任何消息,同时维护一个组计数器和缓冲订购信息。只要您仅在接收和处理该组的所有消息后才提交您的 session ,这将是安全的。 (这将产生更高的性能,因为每个组将由一个单独的线程处理,并且该线程将只访问 MQ 中的每条消息一次。)

关于jms - 如何从 Websphere MQ 读取大量消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33065968/

相关文章:

java - 不寻常的 Java 行为 - 为什么这有效?

java - JMSListener - 动态选择器

记录 MQ 消息

ibm-mq - IBM.WMQ.MQQueueManager 的类型初始值设定项引发异常

javascript - 如何知道 Firefox 插件中哪个内容脚本与后台进行通信?

JAVA 消息服务

java - JMS TransactionRolledbackLocalException 导致 GlassFish 崩溃

java - 在大型机中使用 JMS 从 java 连接到 MQ

php - 用于 PHP/CodeIgniter 的线程消息传递系统?

jms - FOSRestbundle、JMS Serializer 和 SonataMediaBundle 返回图像的公共(public) url