java - 将 Collection 与 ReplyingKafkaTemplate 结合使用时,相关 ID 会丢失

标签 java spring-boot spring-kafka

这是我的听众

@KafkaListener(topics = ["cartListHashes"])
@SendTo
@Transactional
fun listHashes(token: String): Collection<String> {
    // get id
    return doListHashes(token)
}

private fun doListHashes(token: String): Collection<String> {
    val id = userService.lookupIdSync(token)
    if (id == null) {
        log.info("Cannot get user id with token $token")
        return emptyList()
    }
    return cartRepo.listHashes(id).map { base32.encodeToString(it) }
}

问题是相关 ID 丢失。

No correlationId found in reply: xxx - to use request/reply semantics, the responding server must return the correlation id in the 'correlationId' header

最佳答案

事实证明我不能使用 Collection 作为返回类型。 在MessagingMessageListenerAdapter中:

protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {
    if (!messageReturnType && topic == null) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("No replyTopic to handle the reply: " + result);
        }
    }
    else if (result instanceof Message) {
        this.replyTemplate.send((Message<?>) result);
    }
    else {
        if (result instanceof Collection) {
            ((Collection<V>) result).forEach(v -> {
                if (v instanceof Message) {
                    this.replyTemplate.send((Message<?>) v);
                }
                else {
                    this.replyTemplate.send(topic, v);
                }
            });
        }
        else {
            sendSingleResult(result, topic, source);
        }
    }
}

收集结果将被视为单独的消息。

将返回类型修改为Array后就可以了。

@KafkaListener(topics = ["cartListHashes"])
@SendTo
@Transactional
fun listHashes(token: String): Array<String> {
    // get id
    return doListHashes(token)
}

private fun doListHashes(token: String): Array<String> {
    val id = userService.lookupIdSync(token)
    if (id == null) {
        log.info("Cannot get user id with token $token")
        return emptyArray()
    }
    return cartRepo.listHashes(id).map { base32.encodeToString(it) }.toTypedArray()
}

关于java - 将 Collection 与 ReplyingKafkaTemplate 结合使用时,相关 ID 会丢失,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56959708/

相关文章:

java - Spring Boot 与 Web 服务器内存消耗

apache-kafka-streams - 根据部分数据属性更新KTable

java - spring jpa crud 存储库在多线程环境中从数据库获取旧数据

java - android 已建立的连接被主机中的软件中止

C 标准库函数的 Java 等价物

spring-boot - Netflix Zuul CORS

spring - 属性 th :each is not allowed here(Error in Thymeleaf template)

java - KafkaProducer 未生成> 1 MB 的消息到主题

java - 大端字节缓冲区中的字节顺序

java - 有没有办法访问另一个匿名类中的匿名类?