java - 如何从 Qpid JMS (qpid-jms-client-0.11.1.jar) 发送/接收来自 Azure 服务总线的消息?

标签 java jms amqp azureservicebus qpid

我目前正在研究如何使用 Qpid JMS (qpid-jms-client-0.11.1.jar) 连接到 Azure 服务总线。

我创建了一个演示 Java 应用程序 SimpleSenderReceiver,它使用以下指南 (#link1) 连接到已配置的 Azure 服务总线。 这段代码似乎可以在 Qpid JMS 客户端(0.32 版)中使用“非常”旧的版本。我现在正试图让它与最新稳定版本的 Qpid JMS (qpid-jms-client-0.11.1.jar) 一起工作,到目前为止我还没有成功。 浏览文档 #link2在 Qpid JMS 0.11.1 中,您可以看到属性文件中属性 connectionfactory 的方式与版本 0.32 中的不同。

  • 如何在 属性文件?
  • 如何设置 Qpid JMS - Azure 服务总线演示以使用最新的 Qpid 稳定版本

我一直在运行以下问题:

731 [AmqpProvider:(1):[amqps://example-bus.servicebus.windows.net?transport.connectTimeout=60000]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
javax.jms.JMSException: Idle timeout value specified in connection OPEN ('30000 ms') is not supported. Minimum idle timeout is '60000' ms. TrackingId:238849ced1em4cd3a093261372f4fc1e_G21, SystemTracker:gateway6, Timestamp:10/27/2016 8:16:23 AM [condition = amqp:internal-error]
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:150)
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:105)
at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.remotelyClosed(AmqpAbstractResource.java:147)
at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.processRemoteClose(AmqpAbstractResource.java:251)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:771)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:90)
at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

我有以下属性文件 servicebus.properties:

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]

connectionfactory.myFactoryLookup = amqps://example-open-bus.servicebus.windows.net?jms.username=somePolicy&jms.password=aM2k3PaZY5jdIkmGKm7G%2FcH%2BUFQaFAgHIYc3dSsuiLI%3D&transport.connectTimeout=6000

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]

queue.myQueueLookup = queue1

我有流动类 SimpleSenderReceiver.java:

package com.demo.AzureTest;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Hashtable;
import java.util.Random;

public class SimpleSenderReceiver implements MessageListener {

    private static boolean runReceiver = false;
    private Connection connection;
    private Session sendSession;
    private Session receiveSession;
    private MessageProducer sender;
    private MessageConsumer receiver;
    private static Random randomGenerator = new Random();

    public SimpleSenderReceiver() throws Exception {
        // Configure JNDI environment
        Hashtable<String, String> env = new Hashtable<String, String>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, 
                   "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        env.put(Context.PROVIDER_URL, "C://PATH//servicebus.properties");
        Context context = new InitialContext(env);

        // Look up ConnectionFactory and Queue
        ConnectionFactory cf = (ConnectionFactory) context.lookup("myFactoryLookup");
        System.out.println("lookup: " + context.lookup("myFactoryLookup"));
        System.out.println("cf:"+cf);
        Destination queue = (Destination) context.lookup("myQueueLookup");

        System.out.println("queue:");

        // Create Connection
        connection = cf.createConnection();
        System.out.println("connection :"+connection);

//        // Create sender-side Session and MessageProducer
        sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        System.out.println("Session open.");

        sender = sendSession.createProducer(queue);
        System.out.println(sender.getDestination());
        System.out.println("sender:"+sender);

        if (runReceiver) {
            // Create receiver-side Session, MessageConsumer,and MessageListener
            receiveSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            receiver = receiveSession.createConsumer(queue);
            receiver.setMessageListener(this);
            connection.start();
        }
    }

    public static void main(String[] args) {
        try {

            if ((args.length > 0) && args[0].equalsIgnoreCase("sendonly")) {
                runReceiver = false;
            }

            SimpleSenderReceiver simpleSenderReceiver = new SimpleSenderReceiver();
            System.out.println("Press [enter] to send a message. Type 'exit' + [enter] to quit.");
            BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in));

            while (true) {
                String s = commandLine.readLine();
                if (s.equalsIgnoreCase("exit")) {
                    simpleSenderReceiver.close();
                    System.exit(0);
                } else {
                    simpleSenderReceiver.sendMessage();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void sendMessage() throws JMSException {
        TextMessage message = sendSession.createTextMessage();
        message.setText("Hello from SIS Test AMQP message from Java JMSaaa");
        long randomMessageID = randomGenerator.nextLong() >>>1;
        message.setStringProperty("TenantId", "klant");
        message.setStringProperty("EventType", "bericht");
        message.setStringProperty("EventTypeVersion", "1.0");
        message.setStringProperty("MessageType", "DocumentMessage");
        message.setStringProperty("OperationType", "Create");
        message.setStringProperty("SourceSystem", "sis_sender");
        message.setStringProperty("EnterpriseKey", "sis_sender-klant-bericht");
        message.setJMSMessageID("ID:" + randomMessageID);
        sender.send(message);
        System.out.println("Sent message with JMSMessageID = " + message.getJMSMessageID());
        System.out.println("Sent message with Text = " + message.getText());
    }

    public void close() throws JMSException {
        connection.close();
    }

    public void onMessage(Message message) {
        try {
            System.out.println("Received message with JMSMessageID = " + message.getJMSMessageID());
            TextMessage txtmessage = (TextMessage) message;
            System.out.println("Received message with Text = " + txtmessage.getText());
            message.acknowledge();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}  

Maven 依赖项:

    <dependencies>
        <dependency>
          <groupId>org.apache.qpid</groupId>
          <artifactId>qpid-jms-client</artifactId>
          <version>0.11.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.6.2</version>
        </dependency>
    </dependencies>

--- 更新 ---

从那以后,我走得更远了一点,但仍然有点卡住了。 更新 connectionfactory 属性:

connectionfactory.myFactoryLookup = connectionfactory.myFactoryLookup = amqps://example-open-bus.servicebus.windows.net?amqp.idleTimeout=150000&jms.username=somePolicy&jms.password=aM2k3PaZY5jdIkmGKm7G%2FcH%2BUFQaFAgHIYc3dSkuiLI%3D

我现在得到以下堆栈跟踪:

842 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
1014 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:543efe98-3ecc-485e-9f7f-3046c40db0cb:1 connected to remote Broker: amqps://example-open-bus-bus.servicebus.windows.net
1301 [AmqpProvider:(1):[amqps://example-open-bus-bus.servicebus.windows.net]] WARN org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder - Open of resource:(JmsProducerInfo { ID:546efe78-3ecc-485d-9f6f-3065c40db1ce:1:1:1, destination = klant }) failed: Attempted to perform an unauthorized operation. TrackingId:2950b1ed7a0d4e0a97b0k32b25434ac2_G10, SystemTracker:gateway6, Timestamp:10/27/2016 1:36:21 PM [condition = amqp:unauthorized-access]
Caught exception, exiting.
javax.jms.JMSSecurityException: Attempted to perform an unauthorized operation. TrackingId:2890b0ed9a0d4e0a97b1k32b25434ac2_G10, SystemTracker:gateway6, Timestamp:10/27/2016 1:36:21 PM [condition = amqp:unauthorized-access]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:129)
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:105)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:167)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:113)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:795)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:90)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

最佳答案

较新的客户端默认启用 AMQP 心跳/空闲超时,而较旧的客户端则没有。客户端设置默认的 60 秒超时,这意味着它在连接到服务器时在其 AMQP Open 帧中请求 30 秒(30000 毫秒)空闲超时值,根据规范定义的行为(其中对等点通告其实际超时的一半以帮助避免虚假超时)。

ServiceBus 拒绝 30000ms Open frame 值,并表示它需要至少 60000ms 的值(或者可能也是 0,这意味着它被禁用)。为了实现这一点,您需要将客户端配置为将其超时设置为至少 120000 毫秒,这将导致 ServiceBus 强制要求的最小 60000 毫秒开放帧空闲超时值(或者再次,可能通过设置它来禁用客户端超时处理到 0)。

您可以根据 http://qpid.apache.org/releases/qpid-jms-0.11.1/docs/index.html#amqp-configuration-options 使用“amqp.idleTimeout”URI 选项来执行此操作

编辑:我看到您在我输入答案的同时想到了这一点。

新的异常来自 ServiceBus,表示您无权执行您正在尝试的操作。从源头捕获异常并确定是什么应该很容易。

您的 URI 看起来不错(尽管我假设您的用户名实际上不是“somePolicy”,并且开头的双重 connectionfactory.myFactoryLookup = connectionfactory.myFactoryLookup = 是一个 c&p 错误)。我个人没有使用过带有 ServiceBus 的客户端,但我看到过很多人提出的问题,所以我不知道有什么特定的问题会完全阻止他们一起工作。

关于java - 如何从 Qpid JMS (qpid-jms-client-0.11.1.jar) 发送/接收来自 Azure 服务总线的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40281205/

相关文章:

java - 仅使用 Java 搜索文本

java - 将一个数除以 5,得到重量比并用它来分割另一个数

java - Camel JMS : how to configure custom message listener when connecting to a topic

java - 如何做包含JDBC和JMS事务的XA事务?

xmpp - Rabbitmq 或 Ejabberd,我应该使用哪个进行一对一聊天?

c# - 如何使用 RabbitMQ 主机列表连接参数

java.io.IOException : Cannot run program "set": CreateProcess error=2, 系统找不到指定的文件

c# - 我可以向 Java 添加新功能,例如委托(delegate)、后期绑定(bind)等吗?

java - ActiveMQ 中代理的动态名称,可在同一台计算机上运行 2 个或多个实例

rabbitmq - 如何避免使用 amqp php、持久连接和 php-fpm 的每个 tcp 连接的最大 channel 数