java - 从 Azure 服务总线队列接收 NULL 消息正文内容

标签 java azure azureservicebus azure-servicebus-queues

我正在尝试实现一个 Java 应用程序,用于向 Azure 服务总线队列发送消息和从 Azure 服务总线队列接收消息。与门户的连接和消息的发送进展顺利,但在接收消息并获取消息正文内容时,某些变量缺少其值“null”。我该如何解决这个问题?

这是我用来发送和接收消息的代码:

CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient) {
    List<HashMap<String, String>> data =
            GSON.fromJson(
                    "[" +
                            "{'Device ID' = 'FieldPanel_L1'},"+
                            "{'Sensor1name' = 'FieldPanel_SL1', 'value1' = '0', 'Location of Sensor1 X' = '0.0', 'Location of sensor1 Y' = '0.0'}," +
                            "{'Sensor2name' = 'FieldPanel_SL2', 'value2' = '0', 'Location of Sensor2 X' = '20.0', 'Location of sensor2 Y' = '0.0'},"+
                            "{'Sensor3name' = 'FieldPanel_SL3', 'value3' = '0', 'Location of Sensor3 X' = '40.0', 'Location of sensor3 Y' = '0.0'}"+
                        "]",
                        new TypeToken<List<HashMap<String, String>>>() {}.getType());

    List<CompletableFuture> tasks = new ArrayList<>();
    for (int i = 0; i < data.size(); i++) {
        final String messageId = Integer.toString(i);
        Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
        message.setContentType("application/json");
        message.setLabel("FieldPanel");
        message.setMessageId(messageId);
        message.setTimeToLive(Duration.ofMinutes(2));
        System.out.printf("\nSending Message: Id = %s", message.getMessageId());
        tasks.add(
                sendClient.sendAsync(message).thenRunAsync(() -> {
                    System.out.printf("\n\tAcknowledged Message: Id = %s", message.getMessageId());
                }));
    }
    return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
} // Method sendMessageAsync()

void registerReceiver(QueueClient queueClient, ExecutorService executorService) throws Exception {

    // register the RegisterMessageHandler callback with executor service
    queueClient.registerMessageHandler(new IMessageHandler() {

    // callback invoked when the message handler loop has obtained a message
    public CompletableFuture<Void> onMessageAsync(IMessage message) {
        // received message is passed to callback
        if (message.getLabel() != null &&
                message.getContentType() != null &&
                message.getLabel().contentEquals("FieldPanel") &&
                message.getContentType().contentEquals("application/json")) {

                byte[] body = message.getBody();
                Map fieldPanel = GSON.fromJson(new String(body, UTF_8), Map.class);

                System.out.printf("\n\t\t\tMessage received: \n\t\t\t\t\tMessageId = %s, \n\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\tEnqueuedTimeUtc = %s," +
                "\n\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\tContentType = \"%s\",  \n\t\t\t\t\tContent: [ Sensor 1 Name = %s, Value = %s, Location of Sensor1 X = %s, Location of sensor1 Y = %s, Sensor 2 Name = %s, Value = %s, Location of Sensor2 X = %s, Location of sensor2 Y = %s, Sensor 3 Name = %s, Value = %s, Location of Sensor3 X = %s, Location of sensor3 Y = %s ]\n",
                message.getMessageId(),
                message.getSequenceNumber(),
                message.getEnqueuedTimeUtc(),
                message.getExpiresAtUtc(),
                message.getContentType(),

fieldPanel != null ? fieldPanel.get("Sensor1name") : "ERROR" ,
fieldPanel != null ? fieldPanel.get("value1") : "ERROR",
fieldPanel != null ? fieldPanel.get("Location of Sensor1 X") : "ERROR",
fieldPanel != null ? fieldPanel.get("Location of sensor1 Y") : "ERROR",
fieldPanel != null ? fieldPanel.get("Sensor2name") : "ERROR",
fieldPanel != null ? fieldPanel.get("value2") : "ERROR",
fieldPanel != null ? fieldPanel.get("Location of Sensor2 X") : "ERROR",
fieldPanel != null ? fieldPanel.get("Location of sensor2 Y") : "ERROR",
fieldPanel != null ? fieldPanel.get("Sensor3name") : "ERROR",
fieldPanel != null ? fieldPanel.get("value3") : "ERROR",
fieldPanel != null ? fieldPanel.get("Location of Sensor3 X") : "ERROR",
fieldPanel != null ? fieldPanel.get("Location of sensor3 Y") : "ERROR");
                } //Message Body
                return CompletableFuture.completedFuture(null);
            }

 // callback invoked when the message handler has an exception to report
 public void notifyException(Throwable throwable, ExceptionPhase   exceptionPhase) {
                  System.out.printf(exceptionPhase + "-" + throwable.getMessage());}
}, new MessageHandlerOptions(1, true,Duration.ofMinutes(1)),executorService);

         } // Method registerReceiver()

这是我在控制台上收到的消息输出:

Message received: 
    MessageId = 1, 
    SequenceNumber = 15, 
    EnqueuedTimeUtc = 2019-07-22T08:40:29.161Z,
    ExpiresAtUtc = 2019-07-22T08:42:29.161Z, 
    ContentType = "application/json",  
Content: [ Sensor 1 Name = FieldPanel_SL1, Value = 0, Location of Sensor1 X = 0.0, Location of sensor1 Y = 0.0, Sensor 2 Name = null, Value = null, Location of Sensor2 X = null, Location of sensor2 Y = null, Sensor 3 Name = null, Value = null, Location of Sensor3 X = null, Location of sensor3 Y = null ]

最佳答案

我检查了您的代码,发现您得到的输出是正确的。

在您的代码中,您创建一个包含 4 个 HashMap 的列表。将发送 4 条消息。每条消息都是设备的部分信息。

所以,在您收到的消息中,您每次只能获得一份。

我猜json中的所有信息都应该是针对一台设备的。您实际上想在一封邮件中发送它们。

以下是可能满足您要求的示例:

public class ServicebusBasicTest {

    static Gson GSON = new Gson();
    static String connectionString = "Endpoint=sb://ja**78.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=73**U=";

    static CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient) {

        List<HashMap<String, String>> data =
                GSON.fromJson(
                        "[" +
                                "{'Device ID' = 'FieldPanel_L1'}," +
                                "{'Sensor1name' = 'FieldPanel_SL1', 'value1' = '0', 'Location of Sensor1 X' = '0.0', 'Location of sensor1 Y' = '0.0'}," +
                                "{'Sensor2name' = 'FieldPanel_SL2', 'value2' = '0', 'Location of Sensor2 X' = '20.0', 'Location of sensor2 Y' = '0.0'}," +
                                "{'Sensor3name' = 'FieldPanel_SL3', 'value3' = '0', 'Location of Sensor3 X' = '40.0', 'Location of sensor3 Y' = '0.0'}" +
                                "]",
                        new TypeToken<List<HashMap<String, String>>>() {}.getType()
                );

        List<CompletableFuture> tasks = new ArrayList<>();

        final String messageId = Integer.toString(1);
        Message message = new Message(GSON.toJson(data, List.class).getBytes(Charset.forName("UTF-8")));
        message.setContentType("application/json");
        message.setLabel("FieldPanel");
        message.setMessageId(messageId);
        message.setTimeToLive(Duration.ofMinutes(2));
        System.out.printf("\nSending Message: Id = %s\n", message.getMessageId());
        tasks.add(
                sendClient.sendAsync(message).thenRunAsync(() -> {
                    System.out.printf("\nMessage Sent: Id = %s\n", message.getMessageId());
                }));

        return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
    }

    static void registerReceiver(QueueClient queueClient, ExecutorService executorService) throws Exception {
        queueClient.registerMessageHandler(new IMessageHandler() {
                                               public CompletableFuture<Void> onMessageAsync(IMessage message) {
                                                   if (message.getLabel() != null &&
                                                           message.getContentType() != null &&
                                                           message.getLabel().contentEquals("FieldPanel") &&
                                                           message.getContentType().contentEquals("application/json")) {

                                                       List<HashMap<String, String>> fieldPanel = GSON.fromJson(new String(message.getBody(), Charset.forName("UTF-8")), new TypeToken<List<HashMap<String, String>>>() {}.getType());

                                                       System.out.printf(
                                                               "\nMessage received: \n -->MessageId = %s\n -->ContentType = %s",
                                                               message.getMessageId(),
                                                               message.getContentType()
                                                       );

                                                       System.out.print("\n -->Device info");
                                                       for ( HashMap<String,String> map: fieldPanel) {
                                                           for (Object object : map.entrySet()) {
                                                               Map.Entry entry = (Map.Entry) object;
                                                               System.out.printf("\n -----> %s : %s", entry.getKey(),entry.getValue());
                                                           }
                                                       }
                                                       System.out.println();
                                                   }
                                                   return queueClient.completeAsync(message.getLockToken());
                                               }

                                               public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                                                   System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                                               }
                                           },
                new MessageHandlerOptions(1, false, Duration.ofSeconds(5)),
                executorService
        );
    }

    public static void main(String[] args) throws Exception{
        QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(connectionString, "testqueue"), ReceiveMode.PEEKLOCK);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        registerReceiver(receiveClient, executorService);

        QueueClient client = new QueueClient(new ConnectionStringBuilder(connectionString, "testqueue"), ReceiveMode.PEEKLOCK);
        sendMessagesAsync(client).join();

        Thread.sleep(5000);

        client.close();
        receiveClient.close();
        executorService.shutdown();
    }
}

控制台输出:

Sending Message: Id = 1

Message Sent: Id = 1

Message received: 
 -->MessageId = 1
 -->ContentType = application/json
 -->Device info
 -----> Device ID : FieldPanel_L1
 -----> Sensor1name : FieldPanel_SL1
 -----> Location of sensor1 Y : 0.0
 -----> Location of Sensor1 X : 0.0
 -----> value1 : 0
 -----> Location of Sensor2 X : 20.0
 -----> Location of sensor2 Y : 0.0
 -----> value2 : 0
 -----> Sensor2name : FieldPanel_SL2
 -----> Location of Sensor3 X : 40.0
 -----> Location of sensor3 Y : 0.0
 -----> value3 : 0
 -----> Sensor3name : FieldPanel_SL3

关于java - 从 Azure 服务总线队列接收 NULL 消息正文内容,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57142441/

相关文章:

c# - Intuit IPP 无法在 Azure 上运行

azure - 指南 OnMessageOptions.AutoRenewTimeout

azure - 如何仅向一个 Azure 服务总线订阅发送消息?

java - 使用 Java 在 Web 应用程序中集成 Paypal

azure - 将项目发布到 Azure 后出现内部服务器错误 (500)

java - 为什么java中的Object类中有public方法?

sql-server - 即使 SSMS 连接没有问题,也无法将 Excel 2016 连接到 Azure SQL 数据库

c# - session 的 Azure WebJob ServiceBusTrigger

java - Class.getConstructor函数参数

java - 试图在我的应用程序中集成一个拼写检查库