我正在使用 spring.cloud 连接到 Java 中的 Azure 服务总线。这是我正在使用的 Maven 依赖项:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
<version>4.5.0</version>
</dependency>
我能够将队列中的消息作为字节数组使用,并将消息转换为字符串。这是我从队列接收消息后的主要代码:
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
.doOnError(e -> LOGGER.error("Error found", e))
.block();
}
这是我的 JSON 示例数据的简短版本:
{
"serverId": 123,
"message": "{some message}"
}
我想做的是创建一个像这样的Java对象:
public class ExampleMessage {
private final Integer serverId;
private final String message;
当队列中的消息被使用时,它会将消息转换为我的 Java 对象。我习惯于使用 DataTypeProvider 来使用自定义 Java 对象进行 AMQP 消息消费,这将在幕后进行转换和验证转换。 spring.cloud.azure 是否有内置的反序列化方法/功能?或者我是否手动反序列化并验证已使用的消息?
最佳答案
在这里,我能够使用 Gson 将 Json 对象转换为 java 对象。类。
我只是从服务总线读取消息并将其转换为 java 对象。
我的pom.xml(依赖项)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.11.RELEASE</version>
</dependency>
我的测试课:
public class TestClass {
public String name ;
public int version;
TestClass(String n , int v)
{
this.name = n ;
this.version = v ;
}
}
主类:
@SpringBootApplication
public class ServicebustestApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ServicebustestApplication.class, args);
String conn = " Endpoint=sb://v-mganorkarjsonobject.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=HglLVGlgMsYZGQMOtUfp4g2oka1CpCbVR0YEHgly7jA= ";
CountDownLatch countdownLatch1 = new CountDownLatch(1);
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString("<Your COnnection String >")
.processor()
.queueName("test")
.processMessage(ServicebustestApplication::processMessage)
.processError(context -> processError(context,countdownLatch1))
.buildProcessorClient();
processorClient.start();
TimeUnit.SECONDS.sleep(10);
processorClient.close();
}
private static void processMessage(ServiceBusReceivedMessageContext context) {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
message.getSequenceNumber(), message.getBody());
Gson gson = new Gson();
TestClass testobject = gson.fromJson(String.valueOf(message.getBody()),TestClass.class);
System.out.println("name: "+testobject.name +" version: "+ testobject.version+"");
}
private static void processError(ServiceBusErrorContext context, CountDownLatch countdownLatch) {
}
}
这里处理消息的回调将处理消息,然后我们可以使用GSON将json字符串转换为java对象。
Gson gson = new Gson();
TestClass testobject = gson.fromJson(String.valueOf(message.getBody()),TestClass.class);
代码输出:
关于java - 如何在 Java 中使用 spring.cloud 反序列化/序列化 Azure 服务总线,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74775900/