spring-integration - Spring云流从kinesis收到的消息中的特殊字符

标签 spring-integration spring-cloud spring-cloud-stream spring-integration-aws

当我使用来自 kinesis 流的消息时。我收到一些带有标题等的垃圾字符

    @StreamListener(Processor.INPUT)
    public void receive(String message) {       
        System.out.println("Message recieved: "+message);
        throw new RuntimeException("Exception thrown");
    }

    @StreamListener("errorChannel")
    public void transform(ErrorMessage errorMessage) throws UnsupportedEncodingException {      

        //original paylaod 
        System.out.println("Error Oiginal Message Payload"+new String((byte[])errorMessage.getOriginalMessage().getPayload(), "UTF-8"));
        System.out.println("Error Original Message Stream channel"+errorMessage.getOriginalMessage().getHeaders().get("aws_receivedStream"));
    }

应用程序 yml

spring:
  cloud:
    stream:
      bindings:
        input: 
          group: abcd
          destination: stream
          content-type: application/json
          errorChannelEnabled: true
          consumer:
            headerMode: raw

我在监听器和 errorChannel 上都得到带有垃圾字符的输出

我正在尝试提取 errorChannel 中的原始消息。这是转换字节消息的正确方法吗?

Message recieved: ?contentType "application/json"{"aa":"cc"}

最佳答案

AWS Kinesis 不提供任何 header 实体。因此,为了利用 Spring Cloud Stream 中的此类功能,我们将 header 嵌入到 Kinesis 记录的正文中。为此,Kinesis Binder 中的 headerMode 默认为 embeddedHeaders。为了保证生产者和消费者之间的对称性,不得更改此选项。

该框架为目标 @StreamListener channel 提供了开箱即用的 EmbeddedHeadersChannelInterceptor,并且嵌入的 header 被提取并正确填充到要发送的消息中。

当我们处理 errorChannel 中的错误时,我们确实有一个 errorMessage.getOriginalMessage() 作为未转换的原始消息。因此,该消息的有效负载是来自包含嵌入 header 的记录正文的byte[]

如果您想正确解析它们。你应该使用实用程序:

EmbeddedHeaderUtils.extractHeaders((Message<byte[]>) message, true);

关于spring-integration - Spring云流从kinesis收到的消息中的特殊字符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49284437/

相关文章:

java - 哪种类型的 Spring 集成 channel ?

java - TCP Socket 的连接池和多线程?

java - Spring Cloud 上的 KafkaHeaders.RECEIVED_MESSAGE_KEY 与 KafkaHeaders.MESSAGE_KEY header

spring - SpringCloudGateway-记录传入的请求URL和相应的路由URI

spring-cloud - 使用新版本 Chelsea.RC1 的 "condition paramter header"@StreamListener 时出错

Java - 如何在 SpringCloud 和 RabbitMQ 中从单个发布者处进行多个订阅

java - Spring Integration中文件的处理组

spring - 消息转换异常 : Failed to convert message content

apache - 通过 Zuul 和 Apache 访问代理的资源服务器中的远程 IP 地址

java - 如何在我的 Java 应用程序中使用 Amazon Web Services 策略声明?