Spring Cloud Streams 没有在消息中设置 kafka 键?

标签 spring spring-boot apache-kafka spring-cloud spring-cloud-stream

故事是这样的。我有一个 kafka 代理和一个特定对象(我将其 jsonify 以通过我的主题发送),它有一个我想用作 key 的 ID。

目前我正在使用“partitionKeyExtractorClass”配置来设置提取 ID 并将其作为键返回的类。

看起来像这样:

def extractKey(Message<?> message) {
    log.info('Extracting key from message')
    String id = new JsonSlurper().parseText(new String(message.payload)).properties.id
    log.info("Got = ${id}")

    return id
}

我的实际问题是,当我浏览有关该主题的消息时,保存我的消息的 ConsumerRecord 说 key 为空...

这是一个错误吗?难道我做错了什么?关于此的文档仅此而已。

最佳答案

看,您正在将 partitionkey 混合在一起。

目前 KafkaMessageChannelBinder 不提供根据 Message 确定 key 的选项。

KafkaHeaders.MESSAGE_KEY 是您可以有效使用的现有功能:

    Object messageKey = this.messageKeyExpression != null
            ? this.messageKeyExpression.getValue(this.evaluationContext, message)
            : message.getHeaders().get(KafkaHeaders.MESSAGE_KEY);

因此,在输出 消息之前,您应该计算 key 并将其放入该 header 中。

关于Spring Cloud Streams 没有在消息中设置 kafka 键?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40536566/

相关文章:

java - Spring Boot事务管理器建议回滚所有异常

c++ - 将 3rd 方异步 API 与 Cap'n Proto RPC 集成的好方法是什么?

java - Kafka不使用偏移量而是通过记录的字段从主题中删除记录

java - 使用 JS/jQuery/AJAX 和 Controller 在 Java 中分页

java - 如何知道组件(spring/jetty)是否完全初始化的方法

java - 如何在禁用 MongoDB 但仍安装的情况下运行 Spring?

java - 如何在代码中使用新的 Bolt 更新现有的 Storm 拓扑?

java - 当我尝试在 Spring Controller 的方法中将 json 字符串映射到 java 域类时出现 MethodArgumentConversionNotSupportedException

spring-mvc - Thymeleaf: <label> 将动态文本与静态文本 Spring MVC 连接起来

java - Couchbase & Spring 查询错误 : "Query returning a primitive type are expected to return exactly 1 result, got X"