我正在使用这个 docker-compose 设置在本地设置 Kafka:https://github.com/wurstmeister/kafka-docker/
docker-compose up
工作正常,通过 shell 创建主题工作正常。
现在我尝试通过 spring-kafka:2.1.0.RELEASE
当启动 Spring 应用程序时,它会打印正确版本的 Kafka:
o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d
我尝试发送这样的消息
kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
在客户端发送失败
UnknownServerException: The server experienced an unexpected error when processing the request
在服务器控制台中,我收到消息 Magic v1 不支持记录 header
Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
谷歌搜索提示版本冲突,但版本似乎适合(org.apache.kafka:kafka-clients:1.0.0
在类路径中)。
有什么线索吗?谢谢!
编辑: 我缩小了问题的根源。发送纯字符串有效,但通过 JsonSerializer 发送 Json 会导致给定问题。这是我的生产者配置的内容:
@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String
@Bean
fun producerConfigs(): Map<String, Any> =
HashMap<String, Any>().apply {
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
}
@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
DefaultKafkaProducerFactory(producerConfigs())
@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
KafkaTemplate(producerFactory())
最佳答案
我遇到了类似的问题。如果我们使用 JsonSerializer
或 JsonSerde
作为值,Kafka 默认会添加 header 。
为了防止这个问题,我们需要禁用添加信息头。
如果你对默认的 json 序列化没问题,那么使用以下(这里的关键点是 ADD_TYPE_INFO_HEADERS
):
Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);
但如果您需要一个带有特定 ObjectMapper
的自定义 JsonSerializer
(例如使用 PropertyNamingStrategy.SNAKE_CASE
),您应该禁用显式添加信息 header JsonSerializer
,因为spring kafka忽略了DefaultKafkaProducerFactory
的属性ADD_TYPE_INFO_HEADERS
(对我来说这是spring kafka的糟糕设计)
JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);
或者如果我们使用 JsonSerde
,那么:
Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);
关于Spring Kafka Producer 不发送到 Kafka 1.0.0(Magic v1 不支持记录头),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47953901/