java - 将 @Gateway 与 Spring-Integration-Kafka 一起使用

标签 java spring apache-kafka spring-integration

使用Spring-Integration-Kafka我们仍然可以使用@MessagingGateway和@Gateway。

我当前的代码如下所示:

@MessagingGateway
public interface OrderGateway {
 @Gateway(requestChannel = "requestChannel", replyChannel = "replyChannel",headers = {@GatewayHeader(name = "kafka_topic", value ="requestTopic"))
  Order order(Item item)
}

在我的 Spring Spring 配置上:

@Bean
@ServiceActivator(inputChannel = "requestChannel")
public MessageHandler kafkaMessageHandler(KafkaTemplate kafkaTemplate) {
    KafkaProducerMessageHandler<String, String> messageHandler = new KafkaProducerMessageHandler<>(kafkaTemplate);
    messageHandler.setMessageKeyExpression(new LiteralExpression("spring-integration-kafka"));
    messageHandler.setTopicExpression(new SpelExpressionParser().parseExpression("headers.kafka_topic"));
    return messageHandler;
}

通过此设置,我收到错误消息:

by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available  

最佳答案

您似乎没有显示完整的配置。

网关正在等待回复,但 kafkaMessageHandler 不会生成回复(除非模板是 ReplyingKafkaTemplate),并且会丢失 replyChannel header 。

因此,您可能正在尝试从其他地方发送回复。

如果您期望请求/回复语义;使用new outbound gateway .

其中模板必须是 ReplyingKafkaTemplate

关于java - 将 @Gateway 与 Spring-Integration-Kafka 一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49373049/

相关文章:

java - 多线程 Java 的 Hello World

java - 在类 * 中找不到属性 * 的 setter

java - 如何将异常发送到 exceptionController?

java - 如何修复 kafka-start-server.sh 上的 "java.lang.UnsupportedClassVersionError"

java - 从 EntityManagerFactory 发出连接 EntityManager

java - 使用新功能丰富类的设计模式

java - IntelliJ如何将模块添加到现有模块

apache-kafka - 是否建议在 schema.registry.url 中使用多个 URL?

apache-kafka - spring-kafka AckMode中MANUAL和MANUAL_IMMEDIATE有什么区别

java - 如何在 Java 中更改时间格式 - "am/pm"与 "AM/PM"