java - 如何使用 vert.x-rx 创建响应式(Reactive)客户端-服务器 TCP 通信

标签 java reactive-programming vert.x reactive

我目前正在开发一个项目,该项目需要在外部系统和我将编写的应用程序之间进行 TCP 通信(用 Java 编写)。众所周知,使用常规 NIO 可以轻松实现这一点。然而,作为我正在开发的这个新项目的一部分,我必须使用 Vert.x 来提供 TCP 通信。请引用下图:

enter image description here

在右侧,我的应用程序作为 TCP 服务器运行,等待来自左侧外部系统的连接。我读过要创建 TCP 并监听连接,您只需执行以下操作即可:

NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
  if (res.succeeded()) {
    System.out.println("Server is now listening!");
  } else {
    System.out.println("Failed to bind!");
  }
});

但是,我不明白的是当外部系统连接到我的应用程序并通过 TCP 发送 EchoRequestMessages 时如何处理。我的应用程序必须获取接收到的字节缓冲区,将其解码为 EchoRequestMessage POJO,然后将 EchoResponseMessage 编码为字节缓冲区以发送回外部系统。

如何使用 vert.x-rx 对 EchoRequestMessage 的接收、其解码、EchoResponseMessage 的编码执行响应式(Reactive)编程,然后将其发送回外部系统,所有这些都在一个构建器模式类型设置中。我读过有关 Observables 和订阅的内容,但我不知道要观察什么或订阅什么。任何帮助将不胜感激。

最佳答案

要从套接字读取数据,您可以使用RecordParser。在套接字连接上,数据通常由换行符分隔:

RecordParser parser = RecordParser.newDelimited("\n", sock);

RecordParser 是一个 Vert.x ReadStream,因此它可以转换为 Flowable:

FlowableHelper.toFlowable(parser)

现在,如果可以从 Buffer 创建 EchoRequestMessage:

public class EchoRequestMessage {
  private String message;

  public static EchoRequestMessage fromBuffer(Buffer buffer) {
    // Deserialize
  }

  public String getMessage() {
    return message;
   }
 }

并将 EchoResponseMessage 转换为 Buffer:

public class EchoResponseMessage {
  private final String message;

  public EchoResponseMessage(String message) {
    this.message = message;
  }

  public Buffer toBuffer() {
    // Serialize;
  }
}

您可以使用 RxJava 运算符来实现回显服务器流程:

vertx.createNetServer().connectHandler(sock -> {

  RecordParser parser = RecordParser.newDelimited("\n", sock);

  FlowableHelper.toFlowable(parser)
    .map(EchoRequestMessage::fromBuffer)
    .map(echoRequestMessage -> {
      return new EchoResponseMessage(echoRequestMessage.getMessage());
    })
    .subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
      throwable.printStackTrace();
      sock.close();
    }, sock::close);

}).listen(1234);

[编辑]如果您的协议(protocol)消息不是行分隔而是长度前缀,那么您可以创建自定义ReadStream:

class LengthPrefixedStream implements ReadStream<Buffer> {
  final RecordParser recordParser;
  boolean prefix = false;

  private LengthPrefixedStream(ReadStream<Buffer> stream) {
    recordParser = RecordParser.newFixed(4, stream);
  }

  @Override
  public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
    recordParser.exceptionHandler(handler);
    return this;
  }

  @Override
  public ReadStream<Buffer> handler(Handler<Buffer> handler) {
    if (handler == null) {
      recordParser.handler(null);
      return this;
    }
    recordParser.handler(buffer -> {
      if (prefix) {
        prefix = false;
        recordParser.fixedSizeMode(buffer.getInt(0));
      } else {
        prefix = true;
        recordParser.fixedSizeMode(4);
        handler.handle(buffer);
      }
    });
    return this;
  }

  @Override
  public ReadStream<Buffer> pause() {
    recordParser.pause();
    return this;
  }

  @Override
  public ReadStream<Buffer> resume() {
    recordParser.resume();
    return this;
  }

  @Override
  public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
    recordParser.endHandler(endHandler);
    return this;
  }
}

并将其转换为Flowable:

FlowableHelper.toFlowable(new LengthPrefixedStream(sock))

关于java - 如何使用 vert.x-rx 创建响应式(Reactive)客户端-服务器 TCP 通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47534495/

相关文章:

Java - 如何使用类名和方法名作为字符串从另一个类调用方法

Java GUI 框架。选择什么? Swing、SWT、AWT、SwingX、JGoodies、JavaFX、Apache Pivot?

java - 如何使用真实产品测试 android IAP 购买?

rxjs - 为什么 withLatestFrom RxJS 方法不是静态的?

java - Reactor 取消订阅的方式

javascript - 我应该在网络游戏中使用固定时间步长还是动态时间步长?

java - 如何在 try-catch block 中使用 .click() 命令 java android

swift - RxSwift 的操作顺序

java - 两个互相隔离的顶点实例

kotlin - Vert.x编写协程阻止代码的方法