java - Spring-Cloud-Stream-Kafka-Binder 功能风格忽略自定义 De/Serializer 和/或 useNativeEncoding?

标签 java spring-boot apache-kafka spring-cloud-stream

我们刚刚升级到 Spring-Cloud-Stream 3.0.0 版本,遇到以下问题:

当使用这样的函数式风格时:

public class EventProcessor {

    private final PriceValidator priceValidator;

    @Bean
    public Function<Flux<EnrichedValidationRequest>, Flux<ValidationResult>> validate() {
        return enrichedValidationRequestFlux -> enrichedValidationRequestFlux
                .map(ProcessingContext::new)
                .flatMap(priceValidator::validateAndMap);
    }
}

application.yaml 如下所示:

spring.cloud.stream:
  default-binder: kafka
  kafka:
    binder:
      brokers: ${kafka.broker.prod}
      auto-create-topics: false
  function.definition: validate

# INPUT: enrichedValidationRequests
spring.cloud.stream.bindings.validate-in-0:
  destination: ${kafka.topic.${spring.application.name}.input.enrichedValidationRequests}
  group: ${spring.application.name}.${STAGE:NOT_SET}
  consumer:
    useNativeDecoding: true


spring.cloud.stream.kafka.bindings.validate-in-0:
  consumer:
    configuration:
      key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value.deserializer: de.pricevalidator.deserializer.EnrichedValidationRequestDeserializer


# OUTPUT: validationResults
spring.cloud.stream.bindings.validate-out-0:
  destination: validationResultsTmp
  producer:
    useNativeEncoding: true

spring.cloud.stream.kafka.bindings.validate-out-0:
  producer:
    compression.type: lz4
    messageKeyExpression: payload.offerKey
    configuration:
      key.serializer: org.apache.kafka.common.serialization.StringSerializer
      value.serializer: de.pricevalidator.serializer.ValidationResultSerializer

序列化似乎完成了两次 - 当我们拦截 kafka 主题中生成的消息时,消费者只会将它们显示为 JSON(字符串),但现在它是一个不可读的 byte[]。此外,生产中的下游消费者无法再反序列化消息。奇怪的是,输入消息的反序列化似乎工作得很好,无论我们在消费者属性中放入什么(无论是在 Binder 还是在默认的 kafka 级别) 我们有一种感觉,这个错误“又回来了”,但我们在代码中找不到确切的位置:https://github.com/spring-cloud/spring-cloud-stream/issues/1536

我们的(丑陋的)解决方法:

@Slf4j
@Configuration
public class KafkaMessageConverterConfiguration {

    @ConditionalOnProperty(value = "spring.cloud.stream.default-binder", havingValue = "kafka")
    @Bean
    public MessageConverter validationResultConverter(BinderTypeRegistry binder, ObjectMapper objectMapper) {
        return new AbstractMessageConverter(MimeType.valueOf("application/json")) {
            @Override
            protected boolean supports(final Class<?> clazz) {
                return ValidationResult.class.isAssignableFrom(clazz);
            }

            @Override
            protected Object convertToInternal(final Object payload, final MessageHeaders headers, final Object conversionHint) {
                return payload;
            }
        };
    }
}

是否有一种“正确”的方法来设置自定义序列化器或获取以前的 native 编码?

最佳答案

所以这是 3.0.0.RELEASE 之后报告的问题 - https://github.com/spring-cloud/spring-cloud-stream/commit/74aee8102898dbff96a570d2d2624571b259e141 。该问题已得到解决,几天后将在 3.0.1.RELEASE (Horsham.SR1) 中提供。

关于java - Spring-Cloud-Stream-Kafka-Binder 功能风格忽略自定义 De/Serializer 和/或 useNativeEncoding?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59377307/

相关文章:

java - 在 PreparedStatement 中设置任何参数不起作用

java - 从 servlet 或 jsf 托管 bean 打开弹出窗口?

Spring Session——支持除 Redis 之外的其他后端实现

mysql - 无法使用 Spring Boot Hibernate 和 MySQL 存储 "Pile of Poo"unicode 表情符号

python - 使用 Python 读取特定的 Kafka 主题

java - 从kafka高级消费者获取偏移量

python - kafka-python KafkaConsumer 多分区提交偏移量

java - WebSphere Liberty Profile - 如何在部署 Spring Boot uber JAR 时添加安全约束

spring - Spring 中用于 websockets 测试的 MockHttpServletRequestBuilder 相当于什么

java - 使用图像和数据库创建 jar 文件