Spring Kafka Producer 不发送到 Kafka 1.0.0(Magic v1 不支持记录头)

标签 spring spring-boot apache-kafka docker-compose spring-kafka

我正在使用这个 docker-compose 设置在本地设置 Kafka:https://github.com/wurstmeister/kafka-docker/

docker-compose up 工作正常,通过 shell 创建主题工作正常。

现在我尝试通过 spring-kafka:2.1.0.RELEASE

连接到 Kafka

当启动 Spring 应用程序时,它会打印正确版本的 Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

我尝试发送这样的消息

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

在客户端发送失败

UnknownServerException: The server experienced an unexpected error when processing the request

在服务器控制台中,我收到消息 Magic v1 不支持记录 header

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

谷歌搜索提示版本冲突,但版本似乎适合(org.apache.kafka:kafka-clients:1.0.0 在类路径中)。

有什么线索吗?谢谢!

编辑: 我缩小了问题的根源。发送纯字符串有效,但通过 JsonSerializer 发送 Json 会导致给定问题。这是我的生产者配置的内容:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())

最佳答案

我遇到了类似的问题。如果我们使用 JsonSerializerJsonSerde 作为值,Kafka 默认会添加 header 。 为了防止这个问题,我们需要禁用添加信息头。

如果你对默认的 json 序列化没问题,那么使用以下(这里的关键点是 ADD_TYPE_INFO_HEADERS):

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

但如果您需要一个带有特定 ObjectMapper 的自定义 JsonSerializer(例如使用 PropertyNamingStrategy.SNAKE_CASE),您应该禁用显式添加信息 header JsonSerializer,因为spring kafka忽略了DefaultKafkaProducerFactory的属性ADD_TYPE_INFO_HEADERS(对我来说这是spring kafka的糟糕设计)

JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);

或者如果我们使用 JsonSerde,那么:

Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);

关于Spring Kafka Producer 不发送到 Kafka 1.0.0(Magic v1 不支持记录头),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47953901/

相关文章:

java - Spring 启动: getting this error - Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured

json - Spring MVC - HttpMediaTypeNotAcceptableException

spring - Gatling不适用于Spring Boot 2.0.1.RELEASE

Spring Boot Java Kafka 配置,覆盖端口

java - 微服务中Kafka生产者引起的Big GC暂停

java - 卡夫卡经纪人在一段时间后失败了

spring - 如何使用本地 Git 存储库在 Windows 中为 Spring Cloud Config 提供 uri?

java - Camel : define jms queue name from properites file

java - 在 spring boot 中重新加载/刷新缓存

java - JPA Repository 变量的方法无法从 SpringBoot Main 方法保存