我目前正在开发一个项目,该项目需要在外部系统和我将编写的应用程序之间进行 TCP 通信(用 Java 编写)。众所周知,使用常规 NIO 可以轻松实现这一点。然而,作为我正在开发的这个新项目的一部分,我必须使用 Vert.x 来提供 TCP 通信。请引用下图:
在右侧,我的应用程序作为 TCP 服务器运行,等待来自左侧外部系统的连接。我读过要创建 TCP 并监听连接,您只需执行以下操作即可:
NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
if (res.succeeded()) {
System.out.println("Server is now listening!");
} else {
System.out.println("Failed to bind!");
}
});
但是,我不明白的是当外部系统连接到我的应用程序并通过 TCP 发送 EchoRequestMessages 时如何处理。我的应用程序必须获取接收到的字节缓冲区,将其解码为 EchoRequestMessage POJO,然后将 EchoResponseMessage 编码为字节缓冲区以发送回外部系统。
如何使用 vert.x-rx 对 EchoRequestMessage 的接收、其解码、EchoResponseMessage 的编码执行响应式(Reactive)编程,然后将其发送回外部系统,所有这些都在一个构建器模式类型设置中。我读过有关 Observables 和订阅的内容,但我不知道要观察什么或订阅什么。任何帮助将不胜感激。
最佳答案
要从套接字读取数据,您可以使用RecordParser
。在套接字连接上,数据通常由换行符分隔:
RecordParser parser = RecordParser.newDelimited("\n", sock);
RecordParser
是一个 Vert.x ReadStream
,因此它可以转换为 Flowable
:
FlowableHelper.toFlowable(parser)
现在,如果可以从 Buffer
创建 EchoRequestMessage
:
public class EchoRequestMessage {
private String message;
public static EchoRequestMessage fromBuffer(Buffer buffer) {
// Deserialize
}
public String getMessage() {
return message;
}
}
并将 EchoResponseMessage
转换为 Buffer
:
public class EchoResponseMessage {
private final String message;
public EchoResponseMessage(String message) {
this.message = message;
}
public Buffer toBuffer() {
// Serialize;
}
}
您可以使用 RxJava 运算符来实现回显服务器流程:
vertx.createNetServer().connectHandler(sock -> {
RecordParser parser = RecordParser.newDelimited("\n", sock);
FlowableHelper.toFlowable(parser)
.map(EchoRequestMessage::fromBuffer)
.map(echoRequestMessage -> {
return new EchoResponseMessage(echoRequestMessage.getMessage());
})
.subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
throwable.printStackTrace();
sock.close();
}, sock::close);
}).listen(1234);
[编辑]如果您的协议(protocol)消息不是行分隔而是长度前缀,那么您可以创建自定义ReadStream
:
class LengthPrefixedStream implements ReadStream<Buffer> {
final RecordParser recordParser;
boolean prefix = false;
private LengthPrefixedStream(ReadStream<Buffer> stream) {
recordParser = RecordParser.newFixed(4, stream);
}
@Override
public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
recordParser.exceptionHandler(handler);
return this;
}
@Override
public ReadStream<Buffer> handler(Handler<Buffer> handler) {
if (handler == null) {
recordParser.handler(null);
return this;
}
recordParser.handler(buffer -> {
if (prefix) {
prefix = false;
recordParser.fixedSizeMode(buffer.getInt(0));
} else {
prefix = true;
recordParser.fixedSizeMode(4);
handler.handle(buffer);
}
});
return this;
}
@Override
public ReadStream<Buffer> pause() {
recordParser.pause();
return this;
}
@Override
public ReadStream<Buffer> resume() {
recordParser.resume();
return this;
}
@Override
public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
recordParser.endHandler(endHandler);
return this;
}
}
并将其转换为Flowable
:
FlowableHelper.toFlowable(new LengthPrefixedStream(sock))
关于java - 如何使用 vert.x-rx 创建响应式(Reactive)客户端-服务器 TCP 通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47534495/