我正在尝试使用 Netty 在 java 中实现 TCP 服务器。我能够正确处理长度<1024的消息,但是当我收到超过1024的消息时,我只能看到部分消息。
我做了一些研究,发现我应该实现 replayingdecoder 但我无法理解如何实现解码方法
我的消息使用 JSON
Netty版本4.1.27
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception
我的服务器设置
EventLoopGroup group;
group = new NioEventLoopGroup(this.numThreads);
try {
ServerBootstrap serverBootstrap;
RequestHandler requestHandler;
ChannelFuture channelFuture;
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(group);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.localAddress(new InetSocketAddress("::", this.port));
requestHandler = new RequestHandler(this.responseManager, this.logger);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(requestHandler);
}
});
channelFuture = serverBootstrap.bind().sync();
channelFuture.channel().closeFuture().sync();
}
catch(Exception e){
this.logger.info(String.format("Unknown failure %s", e.getMessage()));
}
finally {
try {
group.shutdownGracefully().sync();
}
catch (InterruptedException e) {
this.logger.info(String.format("Error shutting down %s", e.getMessage()));
}
}
我当前的请求处理程序
package me.chirag7jain.Response;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
public class RequestHandler extends ChannelInboundHandlerAdapter {
private ResponseManager responseManager;
private Logger logger;
public RequestHandler(ResponseManager responseManager, Logger logger) {
this.responseManager = responseManager;
this.logger = logger;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf;
String data, hostAddress;
byteBuf = (ByteBuf) msg;
data = byteBuf.toString(CharsetUtil.UTF_8);
hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
if (!data.isEmpty()) {
String reply;
this.logger.info(String.format("Data received %s from %s", data, hostAddress));
reply = this.responseManager.reply(data);
if (reply != null) {
ctx.write(Unpooled.copiedBuffer(reply, CharsetUtil.UTF_8));
}
}
else {
logger.info(String.format("NO Data received from %s", hostAddress));
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
this.logger.info(String.format("Received Exception %s", cause.getMessage()));
ctx.close();
}
最佳答案
我会接受channelRead()中的数据并将其累积在缓冲区中。在从channelRead()返回之前,我会在Channel上调用read()。根据您的需要,您可能需要记录其他数据。 当netty调用channelReadComplete()时,有一个时刻将整个缓冲区发送到您的ResponseManager。
Channel read(): Request to Read data from the Channel into the first inbound buffer, triggers an ChannelInboundHandler.channelRead(ChannelHandlerContext, Object) event if data was read, and triggers a channelReadComplete event so the handler can decide to continue reading.
您的 Channel 对象可通过 ctx.channel() 访问。
试试这个代码:
private final AttributeKey<StringBuffer> dataKey = AttributeKey.valueOf("dataBuf");
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf;
String data, hostAddress;
StringBuffer dataBuf = ctx.attr(dataKey).get();
boolean allocBuf = dataBuf == null;
if (allocBuf) dataBuf = new StringBuffer();
byteBuf = (ByteBuf) msg;
data = byteBuf.toString(CharsetUtil.UTF_8);
hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
if (!data.isEmpty()) {
this.logger.info(String.format("Data received %s from %s", data, hostAddress));
}
else {
logger.info(String.format("NO Data received from %s", hostAddress));
}
dataBuf.append(data);
if (allocBuf) ctx.attr(dataKey).set(dataBuf);
ctx.channel().read();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
StringBuffer dataBuf = ctx.attr(dataKey).get();
if (dataBuf != null) {
String reply;
reply = this.responseManager.reply(dataBuf.toString());
if (reply != null) {
ctx.write(Unpooled.copiedBuffer(reply, CharsetUtil.UTF_8));
}
}
ctx.attr(dataKey).set(null);
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
关于java - Netty:如何在没有任何 header /字段指示长度的情况下在服务器上接收可变长度消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51548745/