故事是这样的。我有一个 kafka 代理和一个特定对象(我将其 jsonify 以通过我的主题发送),它有一个我想用作 key 的 ID。
目前我正在使用“partitionKeyExtractorClass”配置来设置提取 ID 并将其作为键返回的类。
看起来像这样:
def extractKey(Message<?> message) {
log.info('Extracting key from message')
String id = new JsonSlurper().parseText(new String(message.payload)).properties.id
log.info("Got = ${id}")
return id
}
我的实际问题是,当我浏览有关该主题的消息时,保存我的消息的 ConsumerRecord 说 key 为空...
这是一个错误吗?难道我做错了什么?关于此的文档仅此而已。
最佳答案
看,您正在将 partition
与 key
混合在一起。
目前 KafkaMessageChannelBinder
不提供根据 Message
确定 key
的选项。
KafkaHeaders.MESSAGE_KEY
是您可以有效使用的现有功能:
Object messageKey = this.messageKeyExpression != null
? this.messageKeyExpression.getValue(this.evaluationContext, message)
: message.getHeaders().get(KafkaHeaders.MESSAGE_KEY);
因此,在输出
消息之前,您应该计算 key 并将其放入该 header 中。
关于Spring Cloud Streams 没有在消息中设置 kafka 键?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40536566/