java - Netty:如何在没有任何 header /字段指示长度的情况下在服务器上接收可变长度消息

标签 java netty

我正在尝试使用 Netty 在 java 中实现 TCP 服务器。我能够正确处理长度<1024的消息,但是当我收到超过1024的消息时,我只能看到部分消息。

我做了一些研究,发现我应该实现 replayingdecoder 但我无法理解如何实现解码方法

我的消息使用 JSON

Netty版本4.1.27

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception 

我的服务器设置

    EventLoopGroup group;

    group = new NioEventLoopGroup(this.numThreads);

    try {
        ServerBootstrap serverBootstrap;
        RequestHandler requestHandler;
        ChannelFuture channelFuture;

        serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(group);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.localAddress(new InetSocketAddress("::", this.port));

        requestHandler = new RequestHandler(this.responseManager, this.logger);

        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(requestHandler);
            }
        });

        channelFuture = serverBootstrap.bind().sync();
        channelFuture.channel().closeFuture().sync();
    }
    catch(Exception e){
        this.logger.info(String.format("Unknown failure %s", e.getMessage()));
    }
    finally {
        try {
            group.shutdownGracefully().sync();
        }
        catch (InterruptedException e) {
            this.logger.info(String.format("Error shutting down %s", e.getMessage()));
        }

    }

我当前的请求处理程序

package me.chirag7jain.Response;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.apache.logging.log4j.Logger;

import java.net.InetSocketAddress;

public class RequestHandler extends ChannelInboundHandlerAdapter {

private ResponseManager responseManager;
private Logger logger;

public RequestHandler(ResponseManager responseManager, Logger logger) {
    this.responseManager = responseManager;
    this.logger = logger;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf;
    String data, hostAddress;

    byteBuf = (ByteBuf) msg;
    data = byteBuf.toString(CharsetUtil.UTF_8);
    hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();

    if (!data.isEmpty()) {
        String reply;

        this.logger.info(String.format("Data received %s from %s", data, hostAddress));
        reply = this.responseManager.reply(data);

        if (reply != null) {
            ctx.write(Unpooled.copiedBuffer(reply, CharsetUtil.UTF_8));
        }
    }
    else {
        logger.info(String.format("NO Data received from %s", hostAddress));
    }
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    this.logger.info(String.format("Received Exception %s", cause.getMessage()));
    ctx.close();
}

最佳答案

我会接受channelRead()中的数据并将其累积在缓冲区中。在从channelRead()返回之前,我会在Channel上调用read()。根据您的需要,您可能需要记录其他数据。 当netty调用channelReadComplete()时,有一个时刻将整个缓冲区发送到您的ResponseManager。

Channel read(): Request to Read data from the Channel into the first inbound buffer, triggers an ChannelInboundHandler.channelRead(ChannelHandlerContext, Object) event if data was read, and triggers a channelReadComplete event so the handler can decide to continue reading.

您的 Channel 对象可通过 ctx.channel() 访问。

试试这个代码:

private final AttributeKey<StringBuffer> dataKey = AttributeKey.valueOf("dataBuf");

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf;
    String data, hostAddress;

    StringBuffer dataBuf = ctx.attr(dataKey).get();
    boolean allocBuf = dataBuf == null;
    if (allocBuf) dataBuf = new StringBuffer();

    byteBuf = (ByteBuf) msg;
    data = byteBuf.toString(CharsetUtil.UTF_8);
    hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();

    if (!data.isEmpty()) {
        this.logger.info(String.format("Data received %s from %s", data, hostAddress));
    }
    else {
        logger.info(String.format("NO Data received from %s", hostAddress));
    }

    dataBuf.append(data);
    if (allocBuf) ctx.attr(dataKey).set(dataBuf);

    ctx.channel().read();
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    StringBuffer dataBuf = ctx.attr(dataKey).get();
    if (dataBuf != null) {
        String reply;

        reply = this.responseManager.reply(dataBuf.toString());

        if (reply != null) {
            ctx.write(Unpooled.copiedBuffer(reply, CharsetUtil.UTF_8));
        }
    }
    ctx.attr(dataKey).set(null);

    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}

关于java - Netty:如何在没有任何 header /字段指示长度的情况下在服务器上接收可变长度消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51548745/

相关文章:

java - 使用 Universal Image Loader,内存缓存中的图像大小高于实际大小

java - OpenFire 服务器中用户的 JID 应该是什么?

netty - 从字符串构造 Netty ChannelBuffer

mongodb - Netty,服务器使用来自 mongodb 的数据连接客户端

java - LibGDX 在暂停时停止时间计数器

java - 在运行时为使用 wsimport 生成的代码覆盖或设置 Web 服务端点

redis - redisson outofDirectMemory 异常

java - 在 Resteasy + Netty 服务器上实现跨源资源共享(CORS)

java - 如何在 Intellij 中优雅地关闭 gradle/为 java 应用程序设置 java 选项