这是我的听众
@KafkaListener(topics = ["cartListHashes"])
@SendTo
@Transactional
fun listHashes(token: String): Collection<String> {
// get id
return doListHashes(token)
}
private fun doListHashes(token: String): Collection<String> {
val id = userService.lookupIdSync(token)
if (id == null) {
log.info("Cannot get user id with token $token")
return emptyList()
}
return cartRepo.listHashes(id).map { base32.encodeToString(it) }
}
问题是相关 ID 丢失。
No correlationId found in reply: xxx - to use request/reply semantics, the responding server must return the correlation id in the 'correlationId' header
最佳答案
事实证明我不能使用 Collection 作为返回类型。
在MessagingMessageListenerAdapter
中:
protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {
if (!messageReturnType && topic == null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("No replyTopic to handle the reply: " + result);
}
}
else if (result instanceof Message) {
this.replyTemplate.send((Message<?>) result);
}
else {
if (result instanceof Collection) {
((Collection<V>) result).forEach(v -> {
if (v instanceof Message) {
this.replyTemplate.send((Message<?>) v);
}
else {
this.replyTemplate.send(topic, v);
}
});
}
else {
sendSingleResult(result, topic, source);
}
}
}
收集结果将被视为单独的消息。
将返回类型修改为Array后就可以了。
@KafkaListener(topics = ["cartListHashes"])
@SendTo
@Transactional
fun listHashes(token: String): Array<String> {
// get id
return doListHashes(token)
}
private fun doListHashes(token: String): Array<String> {
val id = userService.lookupIdSync(token)
if (id == null) {
log.info("Cannot get user id with token $token")
return emptyArray()
}
return cartRepo.listHashes(id).map { base32.encodeToString(it) }.toTypedArray()
}
关于java - 将 Collection 与 ReplyingKafkaTemplate 结合使用时,相关 ID 会丢失,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56959708/