java - ServiceBus over AMQP 关闭消费者

标签 java amqp servicebus azureservicebus qpid

我们本地安装了 ServiceBus 和 Java JMS QPID 客户端 0.26。 SB 中似乎存在错误 - 当生产者关闭连接时,它会向消费者发送 END 命令。生产者和消费者必须运行在同一台机器上。

首先启动监听队列的消费者:

static void consumeFromQueueForStackOverflow(Connection connection, Context context)
        throws JMSException, NamingException, InterruptedException {
    Session session = null;
    MessageConsumer consumer = null;
    long RECEIVE_TIMEOUT_MS = 30000;

    try {
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

        Queue queue = (Queue) context.lookup("JBA_QUEUE");
        consumer = session.createConsumer(queue);

        connection.start();

        int consumed = 0;
        while (true) {
            long startMS = System.currentTimeMillis();
            Message message = consumer.receive(RECEIVE_TIMEOUT_MS);
            if (message != null) {
                consumed++;
                message.acknowledge();
                continue;
            }

            long durationMS = System.currentTimeMillis() - startMS;
            if (durationMS < RECEIVE_TIMEOUT_MS) {
                log.info(String.format(
                        "Connection is closed, timeout: %d[ms], waited: %s[ms] (consumed: %d)",
                        RECEIVE_TIMEOUT_MS, durationMS, consumed));

                break;
            }

            log.info(String.format("Receive timeout, retrying (consumed: %d)", consumed));
            consumed = 0;
        }
    } finally {
        connection.stop();

        if(null != consumer)
            consumer.close();
        if(null != session)
            session.close();
    }
}

然后将 1 条消息发布到队列:

static void publishToQueueForStackOverflow(Connection connection, Context context)
        throws JMSException, NamingException, InterruptedException {
    Session session = null;
    MessageProducer producer = null;

    try {
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Queue queue = (Queue) context.lookup("JBA_QUEUE");

        producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        TextMessage message = session.createTextMessage("My Message");
        producer.send(message);
    } finally {
        if(producer != null)
            producer.close();
        if(session != null)
            session.close();
    }

    log.info(String.format("Sent %d messages", count));
}

一旦生产者进程结束,消费者也将结束,等待时间小于超时时间。 receive(timeout) 方法在超时到期之前返回 null,这意味着 (javadoc)“消息使用者同时关闭”。或者,您可能会看到 IllegalStateException:在调用 recognize() 期间已关闭。

这是来自消费者的日志。您可以看到服务器向客户端发送了意外的 End{}:

RECV[5671|0] : Open{containerId=087d0b7b8a8e4809a686f8b20d5376f5_GPRGXIT002,maxFrameSize=65536,channelMax=255,idleTimeOut=240000}
SEND[5671|0] : null
SEND[5671|0] : Begin{nextOutgoingId=0,incomingWindow=2048,outgoingWindow=2048,handleMax=4294967295}
SEND[5671|0] : Attach{name=IntegrationServiceBus/jba_testing_queue-> (48ff030e-c7be-42b2-9c22-4d0db13aec8f),handle=0,role=receiver,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=IntegrationServiceBus/jba_testing_queue,durable=none,expiryPolicy=link-detach},target=Target{}}
RECV[5671|0] : Begin{remoteChannel=0,nextOutgoingId=1,incomingWindow=2048,outgoingWindow=2048,handleMax=7}
RECV[5671|0] : Attach{name=IntegrationServiceBus/jba_testing_queue-> (48ff030e-c7be-42b2-9c22-4d0db13aec8f),handle=0,role=sender,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=IntegrationServiceBus/jba_testing_queue,durable=none,expiryPolicy=link-detach},target=Target{},initialDeliveryCount=0,maxMessageSize=262144,properties={com.microsoft:tracking-id=087d0b7b8a8e4809a686f8b20d5376f5_GPRGXIT002_BPRGXIT003;2411:54:55}}
SEND[5671|0] : Flow{nextIncomingId=1,incomingWindow=2048,nextOutgoingId=0,outgoingWindow=2048,handle=0,deliveryCount=0,linkCredit=100,drain=false,echo=false}
RECV[5671|0] : Transfer{handle=0,deliveryId=0,deliveryTag=\x84\xb8.\xf5\xda3\xafF\x89<J\x1bj\xda{<,messageFormat=0,more=false,batchable=true}
RECV[5671|0] : End{}
SEND[5671|0] : Detach{handle=0}
SEND[5671|0] : Disposition{role=receiver,first=0,last=0,settled=true,state=Released{}}
SEND[5671|0] : End{}
SEND[5671|0] : End{}
SEND[5671|0] : Close{}
RECV[5671|0] : Close{}

最佳答案

这是broker的bug,解决方法是使用sync_publish='all',详情参见ServiceBus over AMQP/QPID client closes consumer

关于java - ServiceBus over AMQP 关闭消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22046467/

相关文章:

java - 来自服务实现的 Google Web Toolkit 异步调用

java - PHP 和 Java 中短字符串的安全加密

spring - channel 关闭 : channel error

java - 如何返回不同的数组?

RabbitMQ:由交换确认的消息,没有任何现有的绑定(bind)

java - OpenNMS v18 AMQP 消息发送问题

.net - 用于 Windows Server 事务/错误处理的服务总线 1.0

java - 在 Android 上生成 Azure SAS token

Azure 服务总线 - 使用 BrokeredMessage.GetBody 读取 .NET Core 2 发送的消息

java - subscribeOn 新线程有时不返回