avro - 尝试在 Spring Cloud Stream 中解码 Avro 消息时出现无法识别的 header 字节错误

标签 avro spring-cloud-stream confluent-schema-registry

我正在尝试为我的 Spring Cloud Stream 应用程序编写一个测试用例。我将 Confluence Schema RegistryAvro 结合使用,因此我需要在从 channel 轮询后对消息进行解码。这是我的代码:

    processor.input()
        .send(MessageBuilder.withPayload(InputData).build());

    Message<?> message = messageCollector.forChannel(processor.output()).poll();

    BinaryMessageDecoder<OutputData> decoder = OutputData.getDecoder();
    OutputData outputObject = decoder.decode((byte[]) message.getPayload());

由于某种原因,此代码抛出

org.apache.avro.message.BadHeaderException: Unrecognized header bytes: 0x00 0x08

我不确定这是否是我面临的某种错误,或者我没有遵循正确的方法来解码收到的 avro 消息。我怀疑我需要用某些东西设置标题,但我不太确定如何以及具体用什么设置。如果有人能帮助我解决这个问题,我将不胜感激。

P.S:我使用 spring-cloud-stream-test-support 来进行此测试。

最佳答案

使用测试绑定(bind)器时,数据不会进行 avro 编码。

测试 Binder 非常有限。

要使用 avro 正确进行端到端测试,您应该删除测试绑定(bind)器并使用带有嵌入式 kafka 代理的真实 kafka 绑定(bind)器。

sample apps 之一展示了如何做到这一点。

关于avro - 尝试在 Spring Cloud Stream 中解码 Avro 消息时出现无法识别的 header 字节错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57195310/

相关文章:

hadoop - 简单计数查询超出 Impala 内存限制

python - 使用 python 将 CSV 转换为 AVRO

java - 如何使用 Avro 生成 String 类型的字段而不是 CharSequence?

java - 响应式(Reactive)@StreamListener

java - 未使用 Spring-Cloud-Stream 创建 RabbitMQ Exchange

java - 在从 Kafka 代理获取数据之前,消费者如何检查架构注册表是否可访问?

go - 通过golang将消息以avro格式推送到kafka

java - kafka Avro 多个主题的消息反序列化器

java - 如何使用 Avro 序列化器和模式注册表向 Kafka 发送消息

reactive-programming - 如何以一种非常通用的方式将有效负载 react 性地发送到 Kafka 主题?