使用Spring-Integration-Kafka我们仍然可以使用@MessagingGateway和@Gateway。
我当前的代码如下所示:
@MessagingGateway
public interface OrderGateway {
@Gateway(requestChannel = "requestChannel", replyChannel = "replyChannel",headers = {@GatewayHeader(name = "kafka_topic", value ="requestTopic"))
Order order(Item item)
}
在我的 Spring Spring 配置上:
@Bean
@ServiceActivator(inputChannel = "requestChannel")
public MessageHandler kafkaMessageHandler(KafkaTemplate kafkaTemplate) {
KafkaProducerMessageHandler<String, String> messageHandler = new KafkaProducerMessageHandler<>(kafkaTemplate);
messageHandler.setMessageKeyExpression(new LiteralExpression("spring-integration-kafka"));
messageHandler.setTopicExpression(new SpelExpressionParser().parseExpression("headers.kafka_topic"));
return messageHandler;
}
通过此设置,我收到错误消息:
by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
最佳答案
您似乎没有显示完整的配置。
网关正在等待回复,但 kafkaMessageHandler
不会生成回复(除非模板是 ReplyingKafkaTemplate
),并且会丢失 replyChannel
header 。
因此,您可能正在尝试从其他地方发送回复。
如果您期望请求/回复语义;使用new outbound gateway .
其中模板必须是 ReplyingKafkaTemplate
。
关于java - 将 @Gateway 与 Spring-Integration-Kafka 一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49373049/