Java:Azure服务总线队列通过 session 接收消息

标签 java azure session queue azureservicebus

我正在用 Java 编写代码(使用 Azure SDK for Java),我有一个包含 session 消息的服务总线队列。我想接收这些消息并将它们处理到另一个地方。

我使用 QueueClient 连接到队列,然后使用 registerSessionHandler 处理消息(代码如下)。

问题是,每当收到消息时,我都可以打印有关该消息的所有详细信息(包括内容),但它会打印 10 次,并且每次都会打印异常。 (打印10次:我理解这是因为在将消息扔到死信队列并转到下一条消息之前有一个10次重试策略。)

异常说

> USERCALLBACK-Receiver not created. Registering a MessageHandler creates a receiver.

The output with the Exception

但我确信 SessionHandler 与 MessageHandler 执行相同的操作,但包括对 session 的支持,因此它应该创建一个接收器,因为它接收消息。我尝试过使用 MessageHandler 但它甚至无法工作并停止整个程序,因为它不支持 session 消息,而我收到的消息有 session 。

我的问题是理解异常想要我做什么,以及如何修复代码以便它不会给我任何异常?有人对如何改进代码有建议吗?或者其他做同样事情的方法?

QueueClient qc = new QueueClient(
            new ConnectionStringBuilder(connectionString),
            ReceiveMode.PEEKLOCK);

qc.registerSessionHandler(
            new ISessionHandler() {
                @Override
                public CompletableFuture<Void> onMessageAsync(IMessageSession messageSession, IMessage message) {
                    System.out.printf(
                            "\nMessage received: " +
                                    "\n --> MessageId = %s " +
                                    "\n --> SessionId = %s" +
                                    "\n --> Content Type = %s" +
                                    "\n --> Content = \n\t\t %s",
                            message.getMessageId(),
                            messageSession.getSessionId(),
                            message.getContentType(),
                            getMessageContent(message)
                    );

                    return qc.completeAsync(message.getLockToken());
            }
                @Override
                public CompletableFuture<Void> OnCloseSessionAsync(IMessageSession iMessageSession) {
                    return CompletableFuture.completedFuture(null);
                }

                @Override
                public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                    System.out.println("\n Exception " + exceptionPhase + "-" + throwable.getMessage());
                }
            },
            new SessionHandlerOptions(1, true, Duration.ofMinutes(1)),
            Executors.newSingleThreadExecutor()
);

(对于那些感兴趣的人来说,getMessageContent(message) 方法是一个单独的方法:)

public String getMessageContent(IMessage message){
    List<byte[]> content = message.getMessageBody().getBinaryData();
    StringBuilder sb = new StringBuilder();
    for (byte[] b : content) {
        sb.append(new String(b)
        );
    }
    return sb.toString();
}

最佳答案

对于那些想知道的人,我设法解决了问题!

这只是通过使用 Azure Functions ServiceBusQueueTrigger 完成的,然后它将监听服务总线队列并处理消息。通过将 isSessionsEnabled 设置为 true,它将按照我的意愿接受 session 消息:)

因此,不再编写 100 多行代码,现在的代码如下所示:

public class Function {

@FunctionName("QueueFunction")
public void run(
        @ServiceBusQueueTrigger(
               name = "TriggerName", //Any name you choose
               queueName = "queueName", //QueueName from the portal
               connection = "ConnectionString", //ConnectionString from the portal
               isSessionsEnabled = true
        ) String message,
        ExecutionContext context
) {
    // Write the code you want to do with the message here
    // Using the variable messsage which contains the messageContent, messageId, sessionId etc.
  }
}

关于Java:Azure服务总线队列通过 session 接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59213314/

相关文章:

php - 对 cookie 进行编码,使其不会被欺骗或读取等

java - GWT 在外部服务器上运行

c# - 使用 PHP 和 MVC 将 XML 文件上传到 Web 目录

java - 为什么我应该在 MVP GWT 项目中使用 ClientFactory?

azure - 如何将正确的范围传递给应用程序注册以查询azure管理服务

c# - 访问 Azure 嵌入式二进制资源(FileStream?)

php - 错误 310(net::ERR_TOO_MANY_REDIRECTS):

session - 每次刷新页面都会创建一个新的JSESSIONID,但并非所有浏览器都会创建

java - 比较两个 json 并从第一个 json java 返回唯一键

java - 有效的 Parens 方法 - Java