java - 如何使用 Apache Camel Netty4 在异步模式下通过已建立的 TCP 连接发回响应?

标签 java spring-boot apache-camel netty

我在消费者模式下使用 Netty4 组件 ( http://camel.apache.org/netty4.html ) 构建一个具有 Apache Camel 路由的微服务。因此,在我的微服务中,我正在构建的这条路由将通过 TCP 连接接收消息。为此,我这样做了:

@Override
public void configure() throws Exception {
 this.from("netty4:tcp://localhost:7000?textline=true&encoding=utf8")
   .process(new Processor() {
      @Override
      public void process(final Exchange exchange) throws Exception {
        log.info("[Processor] - Incoming Message -> {}", exchange.getIn().getBody(String.class));
      }
   }).to("bean:messageService");
}

嗯,我能正常收到消息。为了测试,我使用 telnet:

$ telnet localhost 7000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
TheMessage

问题是当我想将消息发送回在该路由中建立的同一 TCP channel 时。在同步模式下,我可以使用 Exchange 对象轻松地做到这一点。 但是,在异步模式下,我不知道如何向生产者发回消息。

接收和发送消息的 Spring 服务是这样的:

@Service
public class MessageService {

  private static final Logger log = LoggerFactory.getLogger(MessageService.class);

  private List<String> messageStore = new LinkedList<>();

  public void sendToTCP(final String message) {
    log.info("[Service] - Sending Message over TCP Channel --> {}", message);
  }

  @Handler
  public void receiveFromTCP(final Exchange exchange) {
    final String messageFromTcp = exchange.getIn().getBody(String.class);
    log.info("[Service] - Message Received from TCP Channel --> {}", messageFromTcp);
    this.messageStore.add(messageFromTcp);
  }

  public List<String> getReceivedMessages() {
    return messageStore;
  }
}

在resume中,我需要的是在这个方法中放一些代码,像这样:

public void sendToTCP(final String message) {
  log.info("[Service] - Sending Message over TCP Channel --> {}", message);
  // Send message to producer here
  camelContext.createProducerTemplate.send....
}

我无法创建另一条通往生产者的路线,因为我不知道生产者 IP。我确实需要在生产者和我的应用程序之间使用已经建立的 TCP channel 。通信需要通过 TCP,其他工具(如队列)不是一种选择。


GitHub 示例项目

我在 GitHub 上上传了一个示例项目:https://github.com/rgiaviti/so-camel-netty4-tcp

绘制

This is the draw of the communication TCP communication

我正在使用:

  • Spring 启动 1.5.12;
  • Apache Camel 2.21.0;

最佳答案

根据@vikram-palakurthi 的评论找到了解决方案。

我使用了 Netty4 属性 reuseChannel。解决方案代码接近于此:

  private static final Logger log = LoggerFactory.getLogger(MessageService.class);

  private List<String> messageStore = new LinkedList<>();
  private Channel openedChannel;

  public void sendToTCP(final String message) {
    log.info("[Service] - Sending Message over TCP Channel --> {}", message);
    log.info("[Service] - Channel is Active? {}", this.openedChannel.isActive());
    log.info("[Service] - Channel is Open? {}", this.openedChannel.isOpen());
    log.info("[Service] - Channel is Writeble? {}", this.openedChannel.isWritable());

    this.openedChannel.write(message);
    this.openedChannel.flush();
  }

  @Handler
  public void receiveFromTCP(final Exchange exchange) {
    this.openedChannel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
    final String messageFromTcp = exchange.getIn().getBody(String.class);
    log.info("[Service] - Message Received from TCP Channel --> {}", messageFromTcp);
    this.messageStore.add(messageFromTcp);
  }

Camel 解决方案

可以在这里找到带有 Camel Routes、Processor... 的完整解决方案:https://github.com/rgiaviti/so-camel-netty4-tcp/tree/solution

但是,我们需要知道这是一个简单的解决方案。开发者还需要处理多个 channel ,生产者关闭 channel ,检查 channel ...

顶点解决方案

我也开发了一个 Vertx 解决方案。 https://github.com/rgiaviti/so-camel-netty4-tcp/tree/vertx-solution

关于java - 如何使用 Apache Camel Netty4 在异步模式下通过已建立的 TCP 连接发回响应?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50043605/

相关文章:

java - 使用apache-camel多播将一个文件上传到不同位置

java - 子模块是解决此 GUI 问题的正确方法吗?

Java - 处理文件删除

mysql - Google Cloud SQL 上的 Spring Boot - org.hibernate.HibernateException : Access to DialectResolutionInfo cannot be null when 'hibernate.dialect' not set

java - Camel POST RestService 接受 JSONArray : annotations and parsing errors

java - ESB 与服务

java - 尝试使用 docx4j 扩展控件绑定(bind)和设置自定义 xsltfinisher 时出现问题

java - 时间戳到日期转换失败

java - 以 pdf 形式发送字节

spring - 具有自动配置功能的 Spring Data JPA 应用程序的多个数据库