java - 如何将消息发送到 IBM MQ 集群中不同队列管理器和主机名中托管的不同队列

标签 java apache-camel ibm-mq

我基于 Apache-camel 的应用正在使用来自 IBM 队列之一的消息,例如下面是连接工厂的详细信息

hostname=host1000
QManager=QM1000
Port="some port"
Channel="common channel"

Camel 流消费和处理并将响应发送到来自消息头的 ReplyQueue。

 from(wmq:queue:<INPUT_QUEUE>)
.bean("processBean")
.bean("beanToSendMsgToReplyQueue")

在 Camel header 中,我低于 JMSReplyQueue。您可以看到它是一个不同的队列管理器,这个队列管理器来自不同的主机,但在集群环境中。

JMSReplyTo = queue://QM1012/TEST.REPLY?targetClient=1

队列管理器也介于两者之间。喜欢

queue://<queue-manager>//<queue-name>?<other parameters>

下面是我在发送消息时遇到的异常。

ERROR o.apache.camel.processor.DefaultErrorHandler:215 - Failed delivery for (MessageId: ID-xxxxxxxxx-0-4 on ExchangeId: ID-xxxxxx-42443-1492594420697-0-1). Exhausted after delivery attempt: 1 caught: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: wmq://queue://QM1012/TEST.REPLY?targetClient=1 due to: Failed to resolve endpoint: wmq://queue://TAP2001R5/TEST?targetClient=1 due to: There are 1 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{targetClient=1}]. Processed by failure processor: FatalFallbackErrorHandler[Pipeline[[Channel[sendTo(Endpoint[wmq://queue:BACKOUT_Q])], Channel[DelegateSync[com.xxx.yyy.listener.XXXOnExceptionProcessor@21c66ee4]], Channel[Stop]]]]

任何人都可以帮助我将消息发送到不同主机中的不同队列管理器队列,但它们都在同一个集群中。队列管理器名称也出现在字符串中间,那么如何解决这个问题。 如果您需要更多详细信息,请告诉我。

更新-1: 尝试使用相同的队列管理器且不带参数

JMSReplyTo = queue://QM1000/QUEUE_V1 我得到以下异常

org.springframework.jms.InvalidDestinationException: JMSWMQ2008: Failed to open MQ queue 'QM1000/QUEUE_V1'.; nested exception is com.ibm.msg.client.jms.DetailedInvalidDestinationException: JMSWMQ2008: Failed to open MQ queue 'QM1000/QUEUE_V1'. JMS attempted to perform an MQOPEN, but WebSphere MQ reported an error. Use the linked exception to determine the cause of this error. Check that the specified queue and queue manager are defined correctly.; nested exception is com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2189' ('MQRC_CLUSTER_RESOLUTION_ERROR').

Update-2

我可以使用普通的 javax.jms.* 和 com.ibm.mq.jms.* api 向 JMSReplyTo 发送消息,但不能通过 Apache camel。 Camel 用户/开发人员组的任何人都可以帮助我使用 camel 组件处理相同的问题。

@Override
public void process(Exchange exchange)
    throws Exception {

    QueueConnection m_connection = this.connectionFactory.createQueueConnection();
    //m_connection.start();
    boolean transacted = false;

    QueueSession session = m_connection.createQueueSession(transacted, QueueSession.AUTO_ACKNOWLEDGE);
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText(exchange.getIn().getBody());
    MQQueue mq = new MQQueue(
        "queue://QM1012/TEST.REPLY");
    QueueSender queueSender = session.createSender((MQQueue) mq);
    queueSender.send(outMessage);

    /* producerTemplate.send("wmq:" + "queue://QM1012/TEST.REPLY", exchange); */
}

最佳答案

您想要与两个不同的队列管理器通信,因此您需要相应地定义两个 Camel JMS 组件实例。 Camel 无法神奇地知道 QM1000 或 QM1012 的含义以及如何访问 QM。

您首先需要两个 WMQ QM 的两个 JMS 连接工厂实例。如何获取这些取决于你的执行环境。在 JEE 服务器上,可以在配置后使用 JNDI 访问连接池。查看有关如何设置 JMS 池的应用服务器文档。如果您独立运行,请查看 Spring JMS 连接缓存或 Atomikos(如果您需要 XA 事务)。 假设 QM1000 的 CF 是 sourceCF,QM1012 的 CF 是 targetCF。

现在您可以定义两个 Camel JMS 组件实例,每个 QM 一个。将连接工厂注入(inject) JMS 组件 (.setConnectionFactory(...))。 假设您定义了一个 id 为“jmssource”的 Camel JMS 组件,注入(inject) sourceCF。在 JMS 组件 ID“jmstarget”中注入(inject) targetCF。 如何做到这一点取决于您的环境(JEE/CDI、Spring、纯 Java)。看看 stackoverflow,有例子。

您现在可以使用以下语法在 Camel 路由上指定 Camel JMS 生产者和消费者:

.from("jmssource:INPUT_QUEUE")
  ...
  (do some processing)
  ...
  .to("jmstarget:QUEUE_V1")

您不能使用 Camel 的 JMS 回复逻辑(使用 JMSReplyTo header )来回复另一个队列管理器。我认为这是 JMS 标准不允许的。您需要通过发送到回复队列来明确回复。

对于设置 targetClient 选项,目标解析器可能很有用:

import org.springframework.jms.support.destination.DynamicDestinationResolver;
import org.springframework.jms.support.destination.DestinationResolver;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import com.ibm.mq.jms.JMSC;
import com.ibm.mq.jms.MQDestination;

public class WMQDestinationResolver extends DynamicDestinationResolver implements DestinationResolver {
  private int targetClient = JMSC.MQJMS_CLIENT_JMS_COMPLIANT;

  public void setTargetClient(int targetClient) {
    this.targetClient = targetClient;
  }

  public Destination resolveDestinationName(Session session, String destinationName, boolean isPubSubDomain) throws JMSException {
    Destination destination = super.resolveDestinationName(session, destinationName, isPubSubDomain);
    if (destination instanceof MQDestination) {
      MQDestination mqDestination = (MQDestination) destination;
      mqDestination.setTargetClient(targetClient);
    }
    return destination;
  }
}

关于java - 如何将消息发送到 IBM MQ 集群中不同队列管理器和主机名中托管的不同队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43525900/

相关文章:

ibm-mq - WebSphere MQ 中的 MQRC 资源问题

ibm-mq - WebSphere MQ q 程序读取/写入文件

java - jdk 1.8 String.equalsIgnoreCase 中的重复空检查

java - 如果 Java 方法深度复制,则无需测试即可确定的一般规则是什么?

java - 如何在 Apache Camel DSL 中回滚事务?

java - 如何从处理器内的 Apache Camel 中的路由获取响应并在处理器内调用该路由?

java - Grails + Struts 瓷砖

java - 将 GWT 部署到 JBoss 时出现 web.xml 问题

java - 在 apache Camel 中压缩和解压缩大文件而不将整个文件加载到内存中

java - websphere MQ 消息的格式是什么