java - 如何使用 Java 将错误消息移动到 Azure 死信队列?

标签 java azure azureservicebus azure-servicebus-queues

我们使用 Azure 服务总线队列在不同系统之间交换消息。我们希望使用 Java 代码将无效消息移至死信队列。

我可以将消息移至主队列,但不能移至死信队列。我尝试将队列名称指定为“BasicQueue/$deadletterqueue”,但出现错误。

最佳答案

Azure 服务总线运行时可能会自动将消息移至死信队列的原因有很多,例如:

  • 超出最大投递次数(启用时默认为 10);
  • 超出消息发送时间 (TTL);
  • 等等;

欲了解更多详情,请访问 https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues ,包括消息被移至死信队列的完整原因列表。

此外,应用程序可以将消息移至死信队列。在下面查找示例 Java 8 控制台应用程序代码和 pom.xml 文件。

请注意,将消息移动到死信队列时,Azure 服务总线运行时会自动填充以下属性,最好在应用程序执行相同操作时也提供这些属性:

DeadLetterReason

DeadLetterErrorDescription

-- Java 版本 --

java version "1.8.0_162"
Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)

--- pom.xml 的 Azure 服务总线依赖项 ---

<dependencies>
    <!-- https://mvnrepository.com/artifact/com.microsoft.azure/azure-servicebus -->
    <dependency>
        <groupId>com.microsoft.azure</groupId>
        <artifactId>azure-servicebus</artifactId>
        <version>2.0.0-preview1</version>
    </dependency>
</dependencies>

-- Java 8 控制台应用程序 --

import java.time.Duration;
import java.util.Scanner;
import com.microsoft.azure.servicebus.MessageHandlerOptions;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;

public class Main {
    public static void main(String[] args)
    {
        final String CONNECTION_STRING = "{Azure Service Bus Connection String}";
        final String QUEUE_NAME = "{Azure Service Bus Queue Name}";

        try {
            moveMessageToDlq(CONNECTION_STRING, QUEUE_NAME);
        }catch (Exception ex) {
            System.out.println(String.format("An exception occurred. Details: %s.", ex.toString()));
        }
    }

    private static void moveMessageToDlq(String connectionString, String queueName)
            throws ServiceBusException, InterruptedException {

        ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(connectionString, queueName);
        QueueClient queueClient = new QueueClient(connectionStringBuilder, ReceiveMode.PEEKLOCK);

        MessageHandler messageHandler = new MessageHandler(queueClient);

        MessageHandlerOptions messageHandlerOptions = new MessageHandlerOptions(
                1,
                false,
                Duration.ofMinutes(1));

        queueClient.registerMessageHandler(messageHandler, messageHandlerOptions);

        final String QUIT_COMMAND_NAME = "quit";
        Scanner scanner = new Scanner(System.in);

        while (true) {
            System.out.println(String.format("Enter '%s' and press <ENTER> to exit...", QUIT_COMMAND_NAME));
            String input = scanner.nextLine();

            if (input.equalsIgnoreCase(QUIT_COMMAND_NAME)){
                System.out.println("Exiting...");
                break;
            }
        }

        scanner.close();

        queueClient.close();
    }
}

--- 消息处理程序 ---

import java.util.concurrent.CompletableFuture;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.IQueueClient;

public class MessageHandler implements IMessageHandler {
    private final IQueueClient _client;

    public MessageHandler(IQueueClient client) {
        if (client == null){
            throw new IllegalArgumentException("Queue client cannot be null.");
        }
        _client = client;
    }

    @Override
    public CompletableFuture<Void> onMessageAsync(IMessage message) {
        System.out.println(String.format("Received message id '%s' (DeliveryCount=%d).",
                message.getMessageId(),
                message.getDeliveryCount()));

        if (message.getLabel().equalsIgnoreCase("dlq")){ // Send message to DLQ if label id dlq
            System.out.println("Sending message to the dead letter queue...");

            return _client.deadLetterAsync(
                    message.getLockToken(),
                    "InvalidMessage", // DeadLetterReason
                    "Message invalid due to..."); // DeadLetterErrorDescription
        }
        else { // Otherwise, complete message
            System.out.println("Completing message...");

            return _client.completeAsync(message.getLockToken());
        }
    }

    @Override
    public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
        System.out.println("An exception occurred. Details: " + exceptionPhase + "-" + throwable.getMessage());
    }
}
<小时/>

GitHub Azure 服务总线 Java 示例: https://github.com/Azure/azure-service-bus-java/tree/e3d163eac92213d34dce059f3353d2c819d31fbf .

关于java - 如何使用 Java 将错误消息移动到 Azure 死信队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50055927/

相关文章:

powershell - 如何从 Powershell 脚本同步 Azure 应用服务 git bash?

powershell - 使用 powershell 从 azure 存储导入发布设置文件

java - 在 Android 应用程序中同步数据

java - 使用java下载WSDL文件

java - 在 OSGi/Equinox 上监控

c# - 在服务总线后面验证过期的 JWT token

c# - MassTransit 在 Azure 服务总线上创建队列

java - 删除方法中的引用并更改静态变量

azure - 如何集成测试Azure Web Jobs?

azure - 在消息服务总线中发送 XML 并在逻辑应用中解析它