我是 Netty 新手。我必须使用静态管道(因为项目经理更喜欢它)。这有点困难,因为我必须在同一线路上处理 RTP 和 RTSP 协议(protocol)。
虽然几乎可以工作,但是存在内存泄漏。 我猜是我的分离器类出了问题。 此外,我认为错误可能是近旁路方法(因为netty的开发人员 - 为了避免不定式循环 - 不允许保持ByteBuf不变,这就是为什么我有创建旁路方法。)
如果你有什么想法,请帮助我! (提前致谢!)
这是我的代码:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.ArrayList;
import java.util.List;
public class Splitter extends ByteToMessageDecoder {
private ByteBuf bb = Unpooled.buffer();
final RtspClientHandler rtspClientHandler;
final RtpClientHandler rtpClientHandler;
public Splitter(RtspClientHandler rtspClientHandler, RtpClientHandler rtpClientHandler) {
this.rtspClientHandler = rtspClientHandler;
this.rtpClientHandler = rtpClientHandler;
}
protected void bypass(ByteBuf in, MessageList<Object> out) {
bb.writeBytes(in);
in.discardReadBytes();
bb.retain();
out.add(bb);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
if (rtspClientHandler.getRTSPstate() == RtspClientHandler.RTSP_CLIENT_STATE.READY) {
if (in.getByte(0) == 0x24 && in.readableBytes() > 4) {
int lengthToRead = in.getUnsignedShort(2);
if (in.readableBytes() >= (lengthToRead + 4)) {
in.skipBytes(4);
if (in.getByte(16) == 0x67 || in.getByte(16) == 0x68) {
final byte bytes[] = new byte[lengthToRead];
in.readBytes(bytes);
in.discardReadBytes();
SPSPPSbuffer spspps = new SPSPPSbuffer();
spspps.setSPSPPS(bytes);
out.add(spspps);
} else {
final byte packetArray[] = new byte[lengthToRead];// copy packet.
in.readBytes(packetArray);
in.discardReadBytes();
out.add(packetArray);
}
}
} else {
bypass(in, out);
}
} else {
bypass(in, out);
}
}
}
最佳答案
看来我可以解决这个问题。
最主要的是:我必须使用一个收集器 ByteBuf,在其中收集来自网络的所有字节(我必须清除输入 ByteBuf),因为有 4 种可能的情况:
数量。字节数(在收集器中)小于 RTP block 大小
数量。字节数(在收集器中)等于 RTP block 大小
数量。字节数(在收集器中)大于 RTP block 大小
收集器 ByteBuf 中存在多个 block 。
这是代码:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageList;
import io.netty.handler.codec.ByteToMessageDecoder;
public class Splitter extends ByteToMessageDecoder {
private ByteBuf collector = Unpooled.buffer();
final RtspClientHandler rtspClientHandler;
final RtpClientHandler rtpClientHandler;
public Splitter(RtspClientHandler rtspClientHandler, RtpClientHandler rtpClientHandler) {
this.rtspClientHandler = rtspClientHandler;
this.rtpClientHandler = rtpClientHandler;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception {
collector.writeBytes(in);
in.discardReadBytes();
in.clear();
if (rtspClientHandler.getRTSPstate() != RtspClientHandler.RTSP_CLIENT_STATE.READY) {
System.out.println("RTSP communication in progress");
collector.retain();
out.add(collector);
return;
}
if (collector.readableBytes() > 0 && collector.getByte(0) != 0x24) {
System.out.println("Clearing the Unpooled.buffer() (because it does not start with 0x24)");
collector.readerIndex(collector.writerIndex());
collector.discardReadBytes();
}
System.out.println("*****New bytes arrived");
while (collector.readableBytes() > 0 && collector.getByte(0) == 0x24) {
System.out.println("Length: " + collector.readableBytes());
if (collector.readableBytes() > 4) {
int lengthToRead = collector.getUnsignedShort(2);
if (collector.readableBytes() >= (lengthToRead + 4)) {
collector.skipBytes(4);
if (collector.getByte(16) == 0x67 || collector.getByte(16) == 0x68) {
final byte bytes[] = new byte[lengthToRead];
collector.readBytes(bytes);
collector.discardReadBytes();
SPSPPSbuffer spspps = new SPSPPSbuffer();
spspps.setSPSPPS(bytes);
out.add(spspps);
} else {
final byte packetArray[] = new byte[lengthToRead];// copy packet.
collector.readBytes(packetArray);
collector.discardReadBytes();
out.add(packetArray);
}
} else {
System.out.println("Not enough length, " + (lengthToRead + 4) + " byte should be required (together with 4 bytes header)");
return;
}
} else {
System.out.println("Less than 5 bytes");
return;
}
}
}
}
关于java - 我的 ByteToMessageDecoder 类中存在内存泄漏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21490123/